summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-12-06 06:22:32 -0500
committersunil unnava <su622b@att.com>2018-12-06 11:45:36 +0000
commit83746dbc42bad55e52d4bed2617d0d0ca8634cb5 (patch)
tree6d6742b9680e59f4cc32edb7b61c189609bb96ef /src/main
parent2b6587421545cf4c21b4b9e211f80c5a394ff697 (diff)
Fix for Kafka Consumer is not safe error1.1.14
Issue-ID: DMAAP-896 Change-Id: I085dbad1248790796e220267cb3e603ecc6c1067 Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java12
1 files changed, 9 insertions, 3 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
index 347f625..2ec323e 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-
-
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -83,6 +82,12 @@ public class Kafka011Consumer implements Consumer {
state = Kafka011Consumer.State.OPENED;
kConsumer = cc;
fKafkaLiveLockAvoider = klla;
+
+ String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "consumer.timeout");
+ if (null != consumerTimeOut) {
+ consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+ }
synchronized (kConsumer) {
kConsumer.subscribe(Arrays.asList(topic));
}
@@ -147,7 +152,7 @@ public class Kafka011Consumer implements Consumer {
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(future);
try {
- future.get(5, TimeUnit.SECONDS); // wait 1
+ future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
// second
} catch (TimeoutException ex) {
// timed out. Try to stop the code if possible.
@@ -370,6 +375,7 @@ public class Kafka011Consumer implements Consumer {
private long offset;
private Kafka011Consumer.State state;
private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+ private int consumerPollTimeOut=5;
private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;