diff options
author | sunil unnava <sunil.unnava@att.com> | 2018-12-06 06:22:32 -0500 |
---|---|---|
committer | sunil unnava <su622b@att.com> | 2018-12-06 11:45:36 +0000 |
commit | 83746dbc42bad55e52d4bed2617d0d0ca8634cb5 (patch) | |
tree | 6d6742b9680e59f4cc32edb7b61c189609bb96ef | |
parent | 2b6587421545cf4c21b4b9e211f80c5a394ff697 (diff) |
Fix for Kafka Consumer is not safe error1.1.14
Issue-ID: DMAAP-896
Change-Id: I085dbad1248790796e220267cb3e603ecc6c1067
Signed-off-by: sunil unnava <sunil.unnava@att.com>
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java | 12 | ||||
-rw-r--r-- | version.properties | 2 |
3 files changed, 11 insertions, 5 deletions
@@ -14,7 +14,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId> <artifactId>msgrtr</artifactId> - <version>1.1.13-SNAPSHOT</version> + <version>1.1.14-SNAPSHOT</version> <packaging>jar</packaging> <name>dmaap-messagerouter-msgrtr</name> <description>Message Router - Restful interface built for kafka</description> diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java index 347f625..2ec323e 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException; import org.onap.dmaap.dmf.mr.backends.Consumer; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; - - +import com.att.ajsc.filemonitor.AJSCPropertiesMap; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -83,6 +82,12 @@ public class Kafka011Consumer implements Consumer { state = Kafka011Consumer.State.OPENED; kConsumer = cc; fKafkaLiveLockAvoider = klla; + + String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "consumer.timeout"); + if (null != consumerTimeOut) { + consumerPollTimeOut = Integer.parseInt(consumerTimeOut); + } synchronized (kConsumer) { kConsumer.subscribe(Arrays.asList(topic)); } @@ -147,7 +152,7 @@ public class Kafka011Consumer implements Consumer { ExecutorService service = Executors.newSingleThreadExecutor(); service.execute(future); try { - future.get(5, TimeUnit.SECONDS); // wait 1 + future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1 // second } catch (TimeoutException ex) { // timed out. Try to stop the code if possible. @@ -370,6 +375,7 @@ public class Kafka011Consumer implements Consumer { private long offset; private Kafka011Consumer.State state; private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; + private int consumerPollTimeOut=5; private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs; diff --git a/version.properties b/version.properties index 3dd3954..c9b51c7 100644 --- a/version.properties +++ b/version.properties @@ -27,7 +27,7 @@ major=1 minor=1 -patch=13 +patch=14 base_version=${major}.${minor}.${patch} |