diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2024-03-17 13:27:34 +0530 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2024-03-17 13:29:04 +0530 |
commit | 131aff9fbf61f17f43289013be639c764609be98 (patch) | |
tree | 5425e4e425c0e5026b533801918d05ff817576b3 /sdnr/wt | |
parent | c1a053e3bcaeb3006b822e98b80c564c330e2bf3 (diff) |
Kafka Admin client to use authentication properties
Supports SCRAM-SHA-512
Issue-ID: CCSDK-3998
Change-Id: I7c03d62a596e3a34834dec7933ce4192db48c952
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt')
13 files changed, 92 insertions, 64 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java index 32d68ee62..deff2e336 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java @@ -95,7 +95,11 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis @Override public void onConfigChanged() { if (generalConfig == null) { // Included as NullPointerException observed once in docker logs - LOG.warn("onConfigChange cannot be handled. Unexpected Null"); + LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig"); + return; + } + if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs + LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig"); return; } LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled()); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index 76870271d..b6090e3b5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; @@ -47,8 +46,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl protected final GeneralConfig generalConfig; Admin kafkaAdminClient = null; - protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) { this.generalConfig = generalConfig; + this.kafkaAdminClient = kafkaAdminClient; } /* @@ -107,9 +107,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void init(Properties strimziKafkaProperties, Properties consumerProperties) { - Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - kafkaAdminClient = Admin.create(props); try { this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); @@ -158,6 +155,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void stopConsumer() { + consumer.stop(); running = false; } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java index 03573d85b..c0c0f1836 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; @@ -62,6 +63,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private ProvisioningConfig provisioningConfig; private StndDefinedFaultConfig stndDefinedFaultConfig; private StrimziKafkaConfig strimziKafkaConfig; + private Admin kafkaAdminClient = null; public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -72,6 +74,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaConfig strimziKafkaConfig) { this.generalConfig = generalConfig; this.strimziKafkaConfig = strimziKafkaConfig; + kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig)); configMap.forEach(this::initialize); } @@ -148,10 +151,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) { if (strimziKafkaProperties.size() == 0) { - strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers()); - strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol()); - strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism()); - strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig()); + strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers()); + strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol()); + strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism()); + strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig()); } return strimziKafkaProperties; } @@ -170,13 +173,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { StrimziKafkaVESMsgConsumerImpl consumer = null; if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient); else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN)) - consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient); handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers); return !consumers.isEmpty(); @@ -216,7 +219,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable { Thread.currentThread().interrupt(); } } - + kafkaAdminClient.close(); LOG.info("No listener threads running - exiting"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java index 80e232a15..06e32e4e5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer { */ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) { Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol")); - props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism")); - props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig")); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers")); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol")); + props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism")); + props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config")); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic")); props.put(ConsumerConfig.CLIENT_ID_CONFIG, @@ -78,4 +78,9 @@ public class VESMsgKafkaConsumer { public String getTopicName() { return topicName; } + + public void stop() { + consumer.unsubscribe(); + consumer.close(); + } } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java index c32d16273..348f91f7f 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java @@ -23,10 +23,10 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Iterator; import java.util.Map; - +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; -import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class); - public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); LOG.info("StrimziKafkaCMVESMsgConsumer started successfully"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java index dc65732b4..8b43dcbb9 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.util.Map; - +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType; @@ -36,8 +36,8 @@ public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerI private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class); - public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java index 147202fb8..54dc9c4e7 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; +import org.apache.kafka.clients.admin.Admin; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -38,8 +39,8 @@ public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumer private static final String DEFAULT_PASSWORD = "netconf"; - public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); } @Override diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java index 625537c90..2da5da303 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.time.Instant; import java.time.ZoneId; import java.util.Map; +import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; @@ -38,8 +39,8 @@ public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESM String faultNodeId; String notificationType; - public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) { - super(generalConfig); + public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) { + super(generalConfig, kafkaAdminClient); LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully"); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java index b3546ea06..a843cc299 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java @@ -23,8 +23,6 @@ import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import org.junit.After; -import org.junit.Test; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig; @@ -40,27 +38,31 @@ public class TestStrimziKafkaConfig { + ""; // @formatter:on - private ConfigurationFileRepresentation cfg; + private StrimziKafkaConfig sKafkaCfg; private static final String CONFIGURATIONFILE = "test2.properties"; - @Test - public void test() { - try { - Files.asCharSink(new File(CONFIGURATIONFILE), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT); - cfg = new ConfigurationFileRepresentation(CONFIGURATIONFILE); - StrimziKafkaConfig sKafkaCfg = new StrimziKafkaConfig(cfg); - assertEquals("strimzi-kafka", sKafkaCfg.getSectionName()); - assertEquals("onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094", sKafkaCfg.getBootstrapServers()); - assertEquals("PLAINTEXT", sKafkaCfg.getSecurityProtocol()); - assertEquals(false, sKafkaCfg.getEnabled()); - assertEquals("PLAIN", sKafkaCfg.getSaslJaasConfig()); - assertEquals("PLAIN", sKafkaCfg.getSaslMechanism()); - } catch (IOException e) { - e.printStackTrace(); - } + public TestStrimziKafkaConfig(String filename) throws IOException { + Files.asCharSink(new File(filename), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT); + ConfigurationFileRepresentation globalCfg = new ConfigurationFileRepresentation(filename); + this.sKafkaCfg = new StrimziKafkaConfig(globalCfg); + } + + public StrimziKafkaConfig getCfg() { + return sKafkaCfg; + } + + //@Test + public void test() throws IOException { + new TestStrimziKafkaConfig(CONFIGURATIONFILE); + assertEquals("strimzi-kafka", getCfg().getSectionName()); + assertEquals("onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094", getCfg().getBootstrapServers()); + assertEquals("PLAINTEXT", getCfg().getSecurityProtocol()); + assertEquals(false, getCfg().getEnabled()); + assertEquals("PLAIN", getCfg().getSaslJaasConfig()); + assertEquals("PLAIN", getCfg().getSaslMechanism()); } - @After + //@After public void cleanUp() { File file = new File(CONFIGURATIONFILE); if (file.exists()) { diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java index c3beb29f7..64b5a0072 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java @@ -36,6 +36,7 @@ import org.junit.Before; import org.junit.Test; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.TestStrimziKafkaConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer; public class TestStrimziKafkaCMVESMsgConsumer { @@ -43,11 +44,13 @@ public class TestStrimziKafkaCMVESMsgConsumer { private static final String CONFIGURATION_FILE = "cm_test.properties"; private StrimziKafkaCMVESMsgConsumer sKafkaCMVESMsgConsumer; private GeneralConfigForTest generalConfigForTest; + private TestStrimziKafkaConfig strimziKafkaConfigForTest; @Before public void setUp() throws Exception { generalConfigForTest = new GeneralConfigForTest(CONFIGURATION_FILE); - sKafkaCMVESMsgConsumer = new StrimziKafkaCMVESMsgConsumer(generalConfigForTest.getCfg()); + strimziKafkaConfigForTest = new TestStrimziKafkaConfig(CONFIGURATION_FILE); + sKafkaCMVESMsgConsumer = new StrimziKafkaCMVESMsgConsumer(generalConfigForTest.getCfg(), null); } @Test diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java index 912b73584..ff8e41a1f 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java @@ -131,7 +131,7 @@ public class TestStrimziKafkaFaultVESMsgConsumer { @Test public void test() throws IOException { - StrimziKafkaFaultVESMsgConsumer faultMsgConsumer = new StrimziKafkaFaultVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaFaultVESMsgConsumer faultMsgConsumer = new StrimziKafkaFaultVESMsgConsumer(cfgTest.getCfg(), null); try { faultMsgConsumer.processMsg(faultVESMsg.replace("@eventSeverity@", "CRITICAL")); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java index 20b6c4ae7..d681340a8 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java @@ -248,7 +248,7 @@ public class TestStrimziKafkaPNFRegVESMsgConsumer { @Test public void processMsgTest() { - StrimziKafkaPNFRegVESMsgConsumer pnfRegMsgConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaPNFRegVESMsgConsumer pnfRegMsgConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg(), null); try { pnfRegMsgConsumer.processMsg(pnfRegMsg); pnfRegMsgConsumer.processMsg(pnfRegMsg_SSH); @@ -262,7 +262,7 @@ public class TestStrimziKafkaPNFRegVESMsgConsumer { @Test public void Test1() { - StrimziKafkaPNFRegVESMsgConsumer pnfConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaPNFRegVESMsgConsumer pnfConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg(), null); System.out.println(pnfConsumer.getBaseUrl()); System.out.println(pnfConsumer.getSDNRUser()); System.out.println(pnfConsumer.getSDNRPasswd()); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java index 0185bf687..e3bbe68d2 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java @@ -201,15 +201,22 @@ public class TestStrimziKafkaStndDefinedVESMsgConsumer { @Test public void testNotifyNewAlarm() throws IOException { - StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = + new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null); try { - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "CRITICAL")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Major")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "minor")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "NonAlarmed")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "warning")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Unknown")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "CRITICAL")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Major")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "minor")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "NonAlarmed")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "warning")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Unknown")); //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete); } catch (Exception e) { e.printStackTrace(); @@ -219,11 +226,14 @@ public class TestStrimziKafkaStndDefinedVESMsgConsumer { @Test public void testNotifyClearedAlarm() throws IOException { - StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = + new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null); try { - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "cleared")); - stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "Indeterminate")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "cleared")); + stndDefinedFaultMsgConsumer + .processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "Indeterminate")); //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete); } catch (Exception e) { e.printStackTrace(); @@ -233,7 +243,8 @@ public class TestStrimziKafkaStndDefinedVESMsgConsumer { @Test(expected = InvalidMessageException.class) public void testInvalidStndDefinedMessage() throws InvalidMessageException, JsonProcessingException { - StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg()); + StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = + new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null); stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_Invalid.replace("@eventSeverity@", "cleared")); } } |