summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java15
1 files changed, 15 insertions, 0 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
index cb617f9e..25ee92ae 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -124,4 +125,18 @@ public class KafkaConfig<T> {
return kafkaTemplate;
}
+ /**
+ * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+ *
+ * @return an instance of cloud Kafka template.
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+ cloudEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(cloudEventConsumerFactory());
+ return containerFactory;
+ }
+
}