aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java21
1 files changed, 12 insertions, 9 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
index 03573d85b..c0c0f1836 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
@@ -62,6 +63,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
private ProvisioningConfig provisioningConfig;
private StndDefinedFaultConfig stndDefinedFaultConfig;
private StrimziKafkaConfig strimziKafkaConfig;
+ private Admin kafkaAdminClient = null;
public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
@@ -72,6 +74,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
StrimziKafkaConfig strimziKafkaConfig) {
this.generalConfig = generalConfig;
this.strimziKafkaConfig = strimziKafkaConfig;
+ kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig));
configMap.forEach(this::initialize);
}
@@ -148,10 +151,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
if (strimziKafkaProperties.size() == 0) {
- strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
- strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
- strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
- strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
+ strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers());
+ strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol());
+ strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism());
+ strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig());
}
return strimziKafkaProperties;
}
@@ -170,13 +173,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
StrimziKafkaVESMsgConsumerImpl consumer = null;
if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
- consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
return !consumers.isEmpty();
@@ -216,7 +219,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
Thread.currentThread().interrupt();
}
}
-
+ kafkaAdminClient.close();
LOG.info("No listener threads running - exiting");
}