diff options
-rw-r--r-- | src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 31 |
1 files changed, 11 insertions, 20 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 643eae9..4bef985 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; + import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); @@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } public DMaaPKafkaMetaBroker( rrNvReadable settings, ZkClient zk, ConfigDb configDb,AdminClient client) { - //fSettings = settings; + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - // fKafkaAdminClient = null; + } @@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } catch ( InterruptedException e ) { - //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); throw new ConfigDbException ( e ); } catch ( ExecutionException e ) { - //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); throw new ConfigDbException ( e.getCause () ); } @@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); - */ + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); |