diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java | 17 |
1 files changed, 3 insertions, 14 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java index 982ba5d..c8c92bb 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java @@ -53,21 +53,10 @@ public class SdcKafkaConsumer { * @param configuration The config provided to the client */ public SdcKafkaConsumer(Configuration configuration) { - Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress()); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); - props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism()); - props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumerGroup()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); - props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000); - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - consumer = new KafkaConsumer<>(props); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(configuration); + Properties props = kafkaCommonConfig.getConsumerProperties(); pollTimeout = configuration.getPollingTimeout(); + consumer = new KafkaConsumer<>(props); } /** |