diff options
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java')
-rw-r--r-- | sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java index 03573d85b..c0c0f1836 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; @@ -62,6 +63,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private ProvisioningConfig provisioningConfig; private StndDefinedFaultConfig stndDefinedFaultConfig; private StrimziKafkaConfig strimziKafkaConfig; + private Admin kafkaAdminClient = null; public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -72,6 +74,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaConfig strimziKafkaConfig) { this.generalConfig = generalConfig; this.strimziKafkaConfig = strimziKafkaConfig; + kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig)); configMap.forEach(this::initialize); } @@ -148,10 +151,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) { if (strimziKafkaProperties.size() == 0) { - strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers()); - strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol()); - strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism()); - strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig()); + strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers()); + strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol()); + strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism()); + strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig()); } return strimziKafkaProperties; } @@ -170,13 +173,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaVESMsgConsumerImpl consumer = null; if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN)) - consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient); handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers); return !consumers.isEmpty(); @@ -216,7 +219,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { Thread.currentThread().interrupt(); } } - + kafkaAdminClient.close(); LOG.info("No listener threads running - exiting"); } |