diff options
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java')
-rw-r--r-- | sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java | 8 |
1 files changed, 3 insertions, 5 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index 76870271d..b6090e3b5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; @@ -47,8 +46,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl protected final GeneralConfig generalConfig; Admin kafkaAdminClient = null; - protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) { this.generalConfig = generalConfig; + this.kafkaAdminClient = kafkaAdminClient; } /* @@ -107,9 +107,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void init(Properties strimziKafkaProperties, Properties consumerProperties) { - Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); - kafkaAdminClient = Admin.create(props); try { this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); @@ -158,6 +155,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl */ @Override public void stopConsumer() { + consumer.stop(); running = false; } |