aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java17
1 files changed, 3 insertions, 14 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
index 982ba5d..c8c92bb 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
@@ -53,21 +53,10 @@ public class SdcKafkaConsumer {
* @param configuration The config provided to the client
*/
public SdcKafkaConsumer(Configuration configuration) {
- Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress());
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
- props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism());
- props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumerGroup());
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- consumer = new KafkaConsumer<>(props);
+ KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(configuration);
+ Properties props = kafkaCommonConfig.getConsumerProperties();
pollTimeout = configuration.getPollingTimeout();
+ consumer = new KafkaConsumer<>(props);
}
/**