diff options
author | Michael Dürre <michael.duerre@highstreet-technologies.com> | 2024-08-15 10:04:54 +0200 |
---|---|---|
committer | Dan Timoney <dtimoney@att.com> | 2024-09-12 15:48:04 -0400 |
commit | 7964e8aa9b9748678f6dadfe7b0c17c936679351 (patch) | |
tree | f4859812acd6ccccdaa698c567bb121f03f7b985 /sdnr/wt/mountpoint-registrar/provider/src/main/java | |
parent | 556306410bc9c3a421cde351d287adeb705d1e8c (diff) |
migrate sdnr features to potassium
adapt features and poms for potassium-sr2
Issue-ID: CCSDK-4046
Change-Id: I67e8fc442811eca40c19f1f3777f1a36812d3e35
Signed-off-by: Michael Dürre <michael.duerre@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java')
8 files changed, 30 insertions, 37 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java index 92367e660..ed468afba 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java @@ -17,6 +17,7 @@ * ============LICENSE_END========================================================================== */ + package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index b6090e3b5..76870271d 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; @@ -46,9 +47,8 @@ public abstract class StrimziKafkaVESMsgConsumerImpl protected final GeneralConfig generalConfig; Admin kafkaAdminClient = null; - protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) { + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { this.generalConfig = generalConfig; - this.kafkaAdminClient = kafkaAdminClient; } /* @@ -107,6 +107,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void init(Properties strimziKafkaProperties, Properties consumerProperties) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); + kafkaAdminClient = Admin.create(props); try { this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); @@ -155,7 +158,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void stopConsumer() { - consumer.stop(); running = false; } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java index c0c0f1836..03573d85b 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java @@ -23,7 +23,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; @@ -63,7 +62,6 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private ProvisioningConfig provisioningConfig; private StndDefinedFaultConfig stndDefinedFaultConfig; private StrimziKafkaConfig strimziKafkaConfig; - private Admin kafkaAdminClient = null; public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -74,7 +72,6 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaConfig strimziKafkaConfig) { this.generalConfig = generalConfig; this.strimziKafkaConfig = strimziKafkaConfig; - kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig)); configMap.forEach(this::initialize); } @@ -151,10 +148,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) { if (strimziKafkaProperties.size() == 0) { - strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers()); - strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol()); - strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism()); - strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig()); + strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers()); + strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol()); + strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism()); + strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig()); } return strimziKafkaProperties; } @@ -173,13 +170,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaVESMsgConsumerImpl consumer = null; if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient); + consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient); + consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig); else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient); + consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig); else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN)) - consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient); + consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig); handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers); return !consumers.isEmpty(); @@ -219,7 +216,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { Thread.currentThread().interrupt(); } } - kafkaAdminClient.close(); + LOG.info("No listener threads running - exiting"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java index 06e32e4e5..80e232a15 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer { */ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) { Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers")); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol")); - props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism")); - props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config")); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol")); + props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism")); + props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig")); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic")); props.put(ConsumerConfig.CLIENT_ID_CONFIG, @@ -78,9 +78,4 @@ public class VESMsgKafkaConsumer { public String getTopicName() { return topicName; } - - public void stop() { - consumer.unsubscribe(); - consumer.close(); - } } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java index 348f91f7f..c32d16273 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java @@ -23,10 +23,10 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Iterator; import java.util.Map; -import org.apache.kafka.clients.admin.Admin; + import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; -import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class); - public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { - super(generalConfig, kafkaAdminClient); + public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); LOG.info("StrimziKafkaCMVESMsgConsumer started successfully"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java index 8b43dcbb9..dc65732b4 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.util.Map; -import org.apache.kafka.clients.admin.Admin; + import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType; @@ -36,8 +36,8 @@ public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerI private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class); - public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { - super(generalConfig, kafkaAdminClient); + public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java index 54dc9c4e7..147202fb8 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; -import org.apache.kafka.clients.admin.Admin; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -39,8 +38,8 @@ public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumer private static final String DEFAULT_PASSWORD = "netconf"; - public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { - super(generalConfig, kafkaAdminClient); + public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java index 2da5da303..625537c90 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.Instant; import java.time.ZoneId; import java.util.Map; -import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -39,8 +38,8 @@ public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESM String faultNodeId; String notificationType; - public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { - super(generalConfig, kafkaAdminClient); + public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully"); } |