diff options
Diffstat (limited to 'sdc-distribution-client/src/main')
3 files changed, 34 insertions, 0 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java index 184dca4..a8ce1c7 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java @@ -67,6 +67,24 @@ public interface IConfiguration { } /** + * Kafka consumer max.poll.interval.ms + * + * @return Kafka max.poll.interval.ms. Default is 300 seconds + */ + default int getKafkaConsumerMaxPollInterval() { + return 300; + } + + /** + * Kafka consumer session.timeout.ms + * + * @return Kafka session.timeout.ms. Default is 45 seconds + */ + default int getKafkaConsumerSessionTimeout() { + return 45; + } + + /** * User Name for SDC distribution consumer authentication. * * @return User Name. diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java index 24f3225..dd67656 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java @@ -30,6 +30,8 @@ public class Configuration implements IConfiguration { private final String kafkaSecurityProtocolConfig; private final String kafkaSaslMechanism; private final String kafkaSaslJaasConfig; + private final int kafkaConsumerMaxPollInterval; + private final int kafkaConsumerSessionTimeout; private String sdcStatusTopicName; private String sdcNotificationTopicName; private String sdcAddress; @@ -77,6 +79,8 @@ public class Configuration implements IConfiguration { this.httpsProxyHost = other.getHttpsProxyHost(); this.httpsProxyPort = other.getHttpsProxyPort(); this.useSystemProxy = other.isUseSystemProxy(); + this.kafkaConsumerMaxPollInterval = other.getKafkaConsumerMaxPollInterval(); + this.kafkaConsumerSessionTimeout = other.getKafkaConsumerSessionTimeout(); } @Override @@ -100,6 +104,16 @@ public class Configuration implements IConfiguration { } @Override + public int getKafkaConsumerMaxPollInterval() { + return kafkaConsumerMaxPollInterval; + } + + @Override + public int getKafkaConsumerSessionTimeout() { + return kafkaConsumerSessionTimeout; + } + + @Override public Boolean isUseHttpsWithSDC() { return useHttpsWithSDC; } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java index 91b41a9..f87b7aa 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java @@ -61,6 +61,8 @@ public class SdcKafkaConsumer { props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); |