From 131aff9fbf61f17f43289013be639c764609be98 Mon Sep 17 00:00:00 2001 From: Ravi Pendurty Date: Sun, 17 Mar 2024 13:27:34 +0530 Subject: Kafka Admin client to use authentication properties Supports SCRAM-SHA-512 Issue-ID: CCSDK-3998 Change-Id: I7c03d62a596e3a34834dec7933ce4192db48c952 Signed-off-by: Ravi Pendurty --- .../impl/MountpointRegistrarImpl.java | 6 +++++- .../impl/StrimziKafkaVESMsgConsumerImpl.java | 8 +++----- .../impl/StrimziKafkaVESMsgConsumerMain.java | 21 ++++++++++++--------- .../kafka/VESMsgKafkaConsumer.java | 13 +++++++++---- .../vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java | 8 ++++---- .../fault/StrimziKafkaFaultVESMsgConsumer.java | 6 +++--- .../pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java | 5 +++-- .../StrimziKafkaStndDefinedFaultVESMsgConsumer.java | 5 +++-- 8 files changed, 42 insertions(+), 30 deletions(-) (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org') diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java index 32d68ee62..deff2e336 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java @@ -95,7 +95,11 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis @Override public void onConfigChanged() { if (generalConfig == null) { // Included as NullPointerException observed once in docker logs - LOG.warn("onConfigChange cannot be handled. Unexpected Null"); + LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig"); + return; + } + if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs + LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig"); return; } LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled()); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index 76870271d..b6090e3b5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; @@ -47,8 +46,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl protected final GeneralConfig generalConfig; Admin kafkaAdminClient = null; - protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) { this.generalConfig = generalConfig; + this.kafkaAdminClient = kafkaAdminClient; } /* @@ -107,9 +107,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void init(Properties strimziKafkaProperties, Properties consumerProperties) { - Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - kafkaAdminClient = Admin.create(props); try { this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); @@ -158,6 +155,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void stopConsumer() { + consumer.stop(); running = false; } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java index 03573d85b..c0c0f1836 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; @@ -62,6 +63,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private ProvisioningConfig provisioningConfig; private StndDefinedFaultConfig stndDefinedFaultConfig; private StrimziKafkaConfig strimziKafkaConfig; + private Admin kafkaAdminClient = null; public StrimziKafkaVESMsgConsumerMain(Map configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -72,6 +74,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaConfig strimziKafkaConfig) { this.generalConfig = generalConfig; this.strimziKafkaConfig = strimziKafkaConfig; + kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig)); configMap.forEach(this::initialize); } @@ -148,10 +151,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) { if (strimziKafkaProperties.size() == 0) { - strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers()); - strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol()); - strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism()); - strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig()); + strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers()); + strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol()); + strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism()); + strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig()); } return strimziKafkaProperties; } @@ -170,13 +173,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaVESMsgConsumerImpl consumer = null; if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN)) - consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient); handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers); return !consumers.isEmpty(); @@ -216,7 +219,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { Thread.currentThread().interrupt(); } } - + kafkaAdminClient.close(); LOG.info("No listener threads running - exiting"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java index 80e232a15..06e32e4e5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer { */ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) { Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol")); - props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism")); - props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig")); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers")); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol")); + props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism")); + props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config")); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic")); props.put(ConsumerConfig.CLIENT_ID_CONFIG, @@ -78,4 +78,9 @@ public class VESMsgKafkaConsumer { public String getTopicName() { return topicName; } + + public void stop() { + consumer.unsubscribe(); + consumer.close(); + } } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java index c32d16273..348f91f7f 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java @@ -23,10 +23,10 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Iterator; import java.util.Map; - +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; -import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class); - public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); LOG.info("StrimziKafkaCMVESMsgConsumer started successfully"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java index dc65732b4..8b43dcbb9 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.util.Map; - +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType; @@ -36,8 +36,8 @@ public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerI private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class); - public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java index 147202fb8..54dc9c4e7 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; +import org.apache.kafka.clients.admin.Admin; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -38,8 +39,8 @@ public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumer private static final String DEFAULT_PASSWORD = "netconf"; - public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java index 625537c90..2da5da303 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.Instant; import java.time.ZoneId; import java.util.Map; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -38,8 +39,8 @@ public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESM String faultNodeId; String notificationType; - public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully"); } -- cgit 1.2.3-korg