aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-client/src/main/java/org')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java18
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java14
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java2
3 files changed, 34 insertions, 0 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
index 184dca4..a8ce1c7 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
@@ -67,6 +67,24 @@ public interface IConfiguration {
}
/**
+ * Kafka consumer max.poll.interval.ms
+ *
+ * @return Kafka max.poll.interval.ms. Default is 300 seconds
+ */
+ default int getKafkaConsumerMaxPollInterval() {
+ return 300;
+ }
+
+ /**
+ * Kafka consumer session.timeout.ms
+ *
+ * @return Kafka session.timeout.ms. Default is 45 seconds
+ */
+ default int getKafkaConsumerSessionTimeout() {
+ return 45;
+ }
+
+ /**
* User Name for SDC distribution consumer authentication.
*
* @return User Name.
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
index 24f3225..dd67656 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
@@ -30,6 +30,8 @@ public class Configuration implements IConfiguration {
private final String kafkaSecurityProtocolConfig;
private final String kafkaSaslMechanism;
private final String kafkaSaslJaasConfig;
+ private final int kafkaConsumerMaxPollInterval;
+ private final int kafkaConsumerSessionTimeout;
private String sdcStatusTopicName;
private String sdcNotificationTopicName;
private String sdcAddress;
@@ -77,6 +79,8 @@ public class Configuration implements IConfiguration {
this.httpsProxyHost = other.getHttpsProxyHost();
this.httpsProxyPort = other.getHttpsProxyPort();
this.useSystemProxy = other.isUseSystemProxy();
+ this.kafkaConsumerMaxPollInterval = other.getKafkaConsumerMaxPollInterval();
+ this.kafkaConsumerSessionTimeout = other.getKafkaConsumerSessionTimeout();
}
@Override
@@ -100,6 +104,16 @@ public class Configuration implements IConfiguration {
}
@Override
+ public int getKafkaConsumerMaxPollInterval() {
+ return kafkaConsumerMaxPollInterval;
+ }
+
+ @Override
+ public int getKafkaConsumerSessionTimeout() {
+ return kafkaConsumerSessionTimeout;
+ }
+
+ @Override
public Boolean isUseHttpsWithSDC() {
return useHttpsWithSDC;
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
index 91b41a9..f87b7aa 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
@@ -61,6 +61,8 @@ public class SdcKafkaConsumer {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);