diff options
3 files changed, 31 insertions, 11 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java index 3b3394454..ed468afba 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java @@ -36,10 +36,10 @@ public abstract class MessageConfig implements Configuration { private static final String DEFAULT_VALUE_CONSUMER_ID = "C1"; public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout"; - private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000"; + private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "2000"; public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit"; - private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000"; + private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "1000"; public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause"; private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000"; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index 249eb612e..76870271d 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; import org.slf4j.Logger; @@ -42,6 +45,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl private boolean ready = false; private int fetchPause = 5000; // Default pause between fetch - 5 seconds protected final GeneralConfig generalConfig; + Admin kafkaAdminClient = null; protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -54,22 +58,22 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void run() { - if (ready) { running = true; while (running) { try { boolean noData = true; List<String> consumerResponse = null; - consumerResponse = consumer.poll(); - for (String msg : consumerResponse) { - noData = false; - LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg); - if (isMessageValid(msg)) { - processMsg(msg); + if (isTopicExists(consumer.getTopicName())) { + consumerResponse = consumer.poll(); + for (String msg : consumerResponse) { + noData = false; + LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg); + if (isMessageValid(msg)) { + processMsg(msg); + } } } - if (noData) { pauseThread(); } @@ -103,6 +107,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void init(Properties strimziKafkaProperties, Properties consumerProperties) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); + kafkaAdminClient = Admin.create(props); try { this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); @@ -122,6 +129,19 @@ public abstract class StrimziKafkaVESMsgConsumerImpl } } + private boolean isTopicExists(String topicName) { + LOG.trace("Checking for existence of topic - {}", topicName); + try { + for (String kafkaTopic : kafkaAdminClient.listTopics().names().get()) { + if (kafkaTopic.equals(topicName)) + return true; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception in isTopicExists method - ", e); + } + return false; + } + @Override public boolean isReady() { return ready; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java index 352db03f2..80e232a15 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -68,7 +68,7 @@ public class VESMsgKafkaConsumer { */ public List<String> poll() { List<String> msgs = new ArrayList<>(); - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout)); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout)); for (ConsumerRecord<String, String> rec : records) { msgs.add(rec.value()); } |