aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk
diff options
context:
space:
mode:
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java6
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java21
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java13
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java6
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java5
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java5
8 files changed, 42 insertions, 30 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index 32d68ee62..deff2e336 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -95,7 +95,11 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
@Override
public void onConfigChanged() {
if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
- LOG.warn("onConfigChange cannot be handled. Unexpected Null");
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
+ return;
+ }
+ if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
return;
}
LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 76870271d..b6090e3b5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
@@ -47,8 +46,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
protected final GeneralConfig generalConfig;
Admin kafkaAdminClient = null;
- protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
+ protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) {
this.generalConfig = generalConfig;
+ this.kafkaAdminClient = kafkaAdminClient;
}
/*
@@ -107,9 +107,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
- Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
- kafkaAdminClient = Admin.create(props);
try {
this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
@@ -158,6 +155,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void stopConsumer() {
+ consumer.stop();
running = false;
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
index 03573d85b..c0c0f1836 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
@@ -62,6 +63,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
private ProvisioningConfig provisioningConfig;
private StndDefinedFaultConfig stndDefinedFaultConfig;
private StrimziKafkaConfig strimziKafkaConfig;
+ private Admin kafkaAdminClient = null;
public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
@@ -72,6 +74,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
StrimziKafkaConfig strimziKafkaConfig) {
this.generalConfig = generalConfig;
this.strimziKafkaConfig = strimziKafkaConfig;
+ kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig));
configMap.forEach(this::initialize);
}
@@ -148,10 +151,10 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
if (strimziKafkaProperties.size() == 0) {
- strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
- strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
- strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
- strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
+ strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers());
+ strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol());
+ strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism());
+ strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig());
}
return strimziKafkaProperties;
}
@@ -170,13 +173,13 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
StrimziKafkaVESMsgConsumerImpl consumer = null;
if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient);
else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
- consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
return !consumers.isEmpty();
@@ -216,7 +219,7 @@ public class StrimziKafkaVESMsgConsumerMain implements Runnable {
Thread.currentThread().interrupt();
}
}
-
+ kafkaAdminClient.close();
LOG.info("No listener threads running - exiting");
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
index 80e232a15..06e32e4e5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
@@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer {
*/
public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
- props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
- props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers"));
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol"));
+ props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism"));
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,
consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic"));
props.put(ConsumerConfig.CLIENT_ID_CONFIG,
@@ -78,4 +78,9 @@ public class VESMsgKafkaConsumer {
public String getTopicName() {
return topicName;
}
+
+ public void stop() {
+ consumer.unsubscribe();
+ consumer.close();
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
index c32d16273..348f91f7f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
@@ -23,10 +23,10 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.Map;
-
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,8 @@ public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl
private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
- public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
- super(generalConfig);
+ public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+ super(generalConfig, kafkaAdminClient);
LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
index dc65732b4..8b43dcbb9 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
-
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
@@ -36,8 +36,8 @@ public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerI
private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
- public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) {
- super(generalConfig);
+ public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+ super(generalConfig, kafkaAdminClient);
}
@Override
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
index 147202fb8..54dc9c4e7 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
import org.eclipse.jdt.annotation.Nullable;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
@@ -38,8 +39,8 @@ public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumer
private static final String DEFAULT_PASSWORD = "netconf";
- public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
- super(generalConfig);
+ public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+ super(generalConfig, kafkaAdminClient);
}
@Override
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
index 625537c90..2da5da303 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
@@ -38,8 +39,8 @@ public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESM
String faultNodeId;
String notificationType;
- public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) {
- super(generalConfig);
+ public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+ super(generalConfig, kafkaAdminClient);
LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully");
}