diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
2 files changed, 137 insertions, 3 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java new file mode 100644 index 0000000000..b76f86ebeb --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.kafka; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * kafka Configuration for legacy and cloud events. + * + * @param <T> valid legacy event to be published over the wire. + */ +@Configuration +@EnableKafka +@RequiredArgsConstructor +public class KafkaTemplateConfig<T> { + + private final KafkaProperties kafkaProperties; + + /** + * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into + * application.yml and replaces value-serializer by JsonSerializer. + * + * @return legacy event producer instance. + */ + @Bean + public ProducerFactory<String, T> legacyEventProducerFactory() { + final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(); + producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined + * into application.yml and replaces deserializer-value by JsonDeserializer. + * + * @return an instance of legacy consumer factory. + */ + @Bean + public ConsumerFactory<String, T> legacyEventConsumerFactory() { + final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into + * application.yml with CloudEventSerializer. + * + * @return cloud event producer instance. + */ + @Bean + public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() { + final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined + * into application.yml having CloudEventDeserializer as deserializer-value. + * + * @return an instance of cloud consumer factory. + */ + @Bean + public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() { + final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. + * + * @return an instance of legacy Kafka template. + */ + @Bean + @Primary + public KafkaTemplate<String, T> legacyEventKafkaTemplate() { + final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); + return kafkaTemplate; + } + + /** + * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this. + * + * @return an instance of cloud Kafka template. + */ + @Bean + public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() { + final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); + kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); + return kafkaTemplate; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index d92316dc58..7b28b4cd5f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -20,6 +20,7 @@ package org.onap.cps.ncmp.api.impl.events; +import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,7 +43,12 @@ import org.springframework.util.concurrent.ListenableFutureCallback; @RequiredArgsConstructor public class EventsPublisher<T> { - private final KafkaTemplate<String, T> eventKafkaTemplate; + /** Once all cps events will be modified to cloud compliant, will remove legacyKafkaEventTemplate with + it's java configuration file KafkaTemplateConfig. **/ + @Deprecated(forRemoval = true) + private final KafkaTemplate<String, T> legacyKafkaEventTemplate; + + private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate; /** * Generic Event publisher. @@ -54,7 +60,8 @@ public class EventsPublisher<T> { */ @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture<SendResult<String, T>> eventFuture + = legacyKafkaEventTemplate.send(topicName, eventKey, event); eventFuture.addCallback(handleCallback(topicName)); } @@ -70,7 +77,7 @@ public class EventsPublisher<T> { final ProducerRecord<String, T> producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(producerRecord); + final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord); eventFuture.addCallback(handleCallback(topicName)); } |