summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2023-04-04 11:32:00 +0100
committerefiacor <fiachra.corcoran@est.tech>2023-04-04 13:08:10 +0100
commitbcbf218b70363f9ee3569d2dfe9b32d6a90fa70a (patch)
tree2d3328e2de96bc4fe4e853e698dd52924b6a84ea
parent0157be91504d447f65d300a7445c02bc71f33353 (diff)
[SDC-DISTRO-CLIENT] Add additional kafka consumer config
Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: Iecd443479258b31d3a4e38699eea0a6e0f423f05 Issue-ID: SDC-4465
-rw-r--r--docs/conf.yaml7
-rw-r--r--pom.xml2
-rw-r--r--sdc-distribution-ci/pom.xml2
-rw-r--r--sdc-distribution-client/pom.xml2
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java18
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java14
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java2
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java6
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java38
-rw-r--r--version.properties2
10 files changed, 56 insertions, 37 deletions
diff --git a/docs/conf.yaml b/docs/conf.yaml
deleted file mode 100644
index ab59281..0000000
--- a/docs/conf.yaml
+++ /dev/null
@@ -1,7 +0,0 @@
----
-project_cfg: onap
-project: onap
-
-# Change this to ReleaseBranchName to modify the header
-default-version: latest
-#
diff --git a/pom.xml b/pom.xml
index 5bf46b2..069999c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
<packaging>pom</packaging>
<name>sdc-sdc-distribution-client</name>
diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml
index 48e0c9c..30bfa67 100644
--- a/sdc-distribution-ci/pom.xml
+++ b/sdc-distribution-ci/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-ci</artifactId>
diff --git a/sdc-distribution-client/pom.xml b/sdc-distribution-client/pom.xml
index e46c61b..eb72205 100644
--- a/sdc-distribution-client/pom.xml
+++ b/sdc-distribution-client/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-client</artifactId>
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
index 184dca4..a8ce1c7 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
@@ -67,6 +67,24 @@ public interface IConfiguration {
}
/**
+ * Kafka consumer max.poll.interval.ms
+ *
+ * @return Kafka max.poll.interval.ms. Default is 300 seconds
+ */
+ default int getKafkaConsumerMaxPollInterval() {
+ return 300;
+ }
+
+ /**
+ * Kafka consumer session.timeout.ms
+ *
+ * @return Kafka session.timeout.ms. Default is 45 seconds
+ */
+ default int getKafkaConsumerSessionTimeout() {
+ return 45;
+ }
+
+ /**
* User Name for SDC distribution consumer authentication.
*
* @return User Name.
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
index 24f3225..dd67656 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
@@ -30,6 +30,8 @@ public class Configuration implements IConfiguration {
private final String kafkaSecurityProtocolConfig;
private final String kafkaSaslMechanism;
private final String kafkaSaslJaasConfig;
+ private final int kafkaConsumerMaxPollInterval;
+ private final int kafkaConsumerSessionTimeout;
private String sdcStatusTopicName;
private String sdcNotificationTopicName;
private String sdcAddress;
@@ -77,6 +79,8 @@ public class Configuration implements IConfiguration {
this.httpsProxyHost = other.getHttpsProxyHost();
this.httpsProxyPort = other.getHttpsProxyPort();
this.useSystemProxy = other.isUseSystemProxy();
+ this.kafkaConsumerMaxPollInterval = other.getKafkaConsumerMaxPollInterval();
+ this.kafkaConsumerSessionTimeout = other.getKafkaConsumerSessionTimeout();
}
@Override
@@ -100,6 +104,16 @@ public class Configuration implements IConfiguration {
}
@Override
+ public int getKafkaConsumerMaxPollInterval() {
+ return kafkaConsumerMaxPollInterval;
+ }
+
+ @Override
+ public int getKafkaConsumerSessionTimeout() {
+ return kafkaConsumerSessionTimeout;
+ }
+
+ @Override
public Boolean isUseHttpsWithSDC() {
return useHttpsWithSDC;
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
index 91b41a9..f87b7aa 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
@@ -61,6 +61,8 @@ public class SdcKafkaConsumer {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
index 744e9cc..2037be6 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
@@ -31,11 +31,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
-import org.onap.sdc.api.consumer.IConfiguration;
import org.onap.sdc.impl.Configuration;
import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.onap.sdc.utils.kafka.SdcKafkaProducer;
@@ -75,6 +74,9 @@ class SdcKafkaTest {
consumer.subscribe(topicName);
consumer.poll();
+ Assertions.assertEquals(configuration.getKafkaConsumerMaxPollInterval(), 600);
+ Assertions.assertEquals(configuration.getKafkaConsumerSessionTimeout(), 50);
+
SdcKafkaProducer producer = new SdcKafkaProducer(configuration);
producer.send(topicName, "blah", "blah");
producer.send(topicName, "blah", "blah");
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
index 86f29be..529124e 100644
--- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
@@ -38,6 +38,8 @@ public class TestConfiguration implements IConfiguration {
private final String kafkaSecurityProtocolConfig;
private final String kafkaSaslMechanism;
private final String kafkaSaslJaasConfig;
+ private final int kafkaConsumerMaxPollInterval;
+ private final int kafkaConsumerSessionTimeout;
private String keyStorePath;
private String keyStorePassword;
private boolean activateServerTLSAuth;
@@ -52,30 +54,6 @@ public class TestConfiguration implements IConfiguration {
private String sdcStatusTopicName;
private String sdcNotificationTopicName;
- public TestConfiguration(IConfiguration other) {
- this.sdcAddress = other.getSdcAddress();
- this.comsumerID = other.getConsumerID();
- this.consumerGroup = other.getConsumerGroup();
- this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
- this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
- this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
- this.pollingInterval = other.getPollingInterval();
- this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
- this.user = other.getUser();
- this.keyStorePath = other.getKeyStorePath();
- this.keyStorePassword = other.getKeyStorePassword();
- this.activateServerTLSAuth = other.activateServerTLSAuth();
- this.isFilterInEmptyResources = other.isFilterInEmptyResources();
- this.httpProxyHost = other.getHttpProxyHost();
- this.httpProxyPort = other.getHttpProxyPort();
- this.httpsProxyHost = other.getHttpsProxyHost();
- this.httpsProxyPort = other.getHttpsProxyPort();
- this.useSystemProxy = other.isUseSystemProxy();
- }
-
public TestConfiguration() {
this.sdcAddress = "localhost:8443";
this.comsumerID = "mso-123456";
@@ -101,6 +79,8 @@ public class TestConfiguration implements IConfiguration {
this.kafkaSecurityProtocolConfig = "SASL_PLAINTEXT";
this.kafkaSaslMechanism = "PLAIN";
this.kafkaSaslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;";
+ this.kafkaConsumerMaxPollInterval = 600;
+ this.kafkaConsumerSessionTimeout = 50;
this.httpProxyHost = "proxy";
this.httpProxyPort = 8080;
}
@@ -126,6 +106,16 @@ public class TestConfiguration implements IConfiguration {
}
@Override
+ public int getKafkaConsumerMaxPollInterval() {
+ return kafkaConsumerMaxPollInterval;
+ }
+
+ @Override
+ public int getKafkaConsumerSessionTimeout() {
+ return kafkaConsumerSessionTimeout;
+ }
+
+ @Override
public String getUser() {
return user;
}
diff --git a/version.properties b/version.properties
index b81f487..4354315 100644
--- a/version.properties
+++ b/version.properties
@@ -5,7 +5,7 @@
major=2
minor=0
-patch=1
+patch=2
base_version=${major}.${minor}.${patch}