summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java31
1 files changed, 11 insertions, 20 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 643eae9..4bef985 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
public DMaaPKafkaMetaBroker( rrNvReadable settings,
ZkClient zk, ConfigDb configDb,AdminClient client) {
- //fSettings = settings;
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
- // fKafkaAdminClient = null;
+
}
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
catch ( InterruptedException e )
{
- //timer.fail ( "Timeout" );
+
log.warn ( "Execution of describeTopics timed out." );
throw new ConfigDbException ( e );
}
catch ( ExecutionException e )
{
- //timer.fail ( "ExecutionError" );
+
log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
throw new ConfigDbException ( e.getCause () );
}
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
- ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
- */
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- //AdminUtils.deleteTopic(zkutils, topic);
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);