diff options
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java')
-rw-r--r-- | sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java index 80e232a15..06e32e4e5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -33,10 +33,10 @@ public class VESMsgKafkaConsumer { */ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) { Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol")); - props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism")); - props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig")); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers")); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol")); + props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism")); + props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config")); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic")); props.put(ConsumerConfig.CLIENT_ID_CONFIG, @@ -78,4 +78,9 @@ public class VESMsgKafkaConsumer { public String getTopicName() { return topicName; } + + public void stop() { + consumer.unsubscribe(); + consumer.close(); + } } |