summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java36
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java2
3 files changed, 31 insertions, 11 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
index 3b3394454..ed468afba 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
@@ -36,10 +36,10 @@ public abstract class MessageConfig implements Configuration {
private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
- private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+ private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "2000";
public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
- private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+ private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "1000";
public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 249eb612e..76870271d 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
import org.slf4j.Logger;
@@ -42,6 +45,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
private boolean ready = false;
private int fetchPause = 5000; // Default pause between fetch - 5 seconds
protected final GeneralConfig generalConfig;
+ Admin kafkaAdminClient = null;
protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
@@ -54,22 +58,22 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void run() {
-
if (ready) {
running = true;
while (running) {
try {
boolean noData = true;
List<String> consumerResponse = null;
- consumerResponse = consumer.poll();
- for (String msg : consumerResponse) {
- noData = false;
- LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
- if (isMessageValid(msg)) {
- processMsg(msg);
+ if (isTopicExists(consumer.getTopicName())) {
+ consumerResponse = consumer.poll();
+ for (String msg : consumerResponse) {
+ noData = false;
+ LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
+ if (isMessageValid(msg)) {
+ processMsg(msg);
+ }
}
}
-
if (noData) {
pauseThread();
}
@@ -103,6 +107,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
+ kafkaAdminClient = Admin.create(props);
try {
this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
@@ -122,6 +129,19 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
}
}
+ private boolean isTopicExists(String topicName) {
+ LOG.trace("Checking for existence of topic - {}", topicName);
+ try {
+ for (String kafkaTopic : kafkaAdminClient.listTopics().names().get()) {
+ if (kafkaTopic.equals(topicName))
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception in isTopicExists method - ", e);
+ }
+ return false;
+ }
+
@Override
public boolean isReady() {
return ready;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
index 352db03f2..80e232a15 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
@@ -68,7 +68,7 @@ public class VESMsgKafkaConsumer {
*/
public List<String> poll() {
List<String> msgs = new ArrayList<>();
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout));
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout));
for (ConsumerRecord<String, String> rec : records) {
msgs.add(rec.value());
}