aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java8
1 files changed, 3 insertions, 5 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 76870271d..b6090e3b5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
@@ -47,8 +46,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
protected final GeneralConfig generalConfig;
Admin kafkaAdminClient = null;
- protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
+ protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) {
this.generalConfig = generalConfig;
+ this.kafkaAdminClient = kafkaAdminClient;
}
/*
@@ -107,9 +107,6 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
- Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
- kafkaAdminClient = Admin.create(props);
try {
this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
@@ -158,6 +155,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
*/
@Override
public void stopConsumer() {
+ consumer.stop();
running = false;
}