summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java18
1 files changed, 3 insertions, 15 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index a4ae2be..1e20ee2 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -39,16 +39,13 @@ import org.apache.kafka.common.KafkaFuture;
import org.json.JSONObject;
import org.json.JSONArray;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-import org.onap.dmaap.dmf.mr.metabroker.Broker;
import org.onap.dmaap.dmf.mr.metabroker.Broker1;
import org.onap.dmaap.dmf.mr.metabroker.Topic;
import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -94,7 +91,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
- //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
private final AdminClient fKafkaAdminClient;
@@ -109,7 +105,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
@Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
- //fSettings = settings;
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
@@ -149,7 +144,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public List<Topic> getAllTopics() throws ConfigDbException {
log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<Topic>();
+ final LinkedList<Topic> result = new LinkedList<>();
try {
log.info("Retrieving all topics from root: " + zkTopicsRoot);
final List<String> topics = fZk.getChildren(zkTopicsRoot);
@@ -229,7 +224,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
try
{
- final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+ final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
final KafkaFuture<Void> ctrResult = ctr.all ();
ctrResult.get ();
@@ -254,7 +249,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
log.info("Deleting topic: " + topic);
- ZkClient zkClient = null;
try {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
@@ -269,15 +263,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
throw new ConfigDbException(e);
} finally {
log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
}
-
- // throw new UnsupportedOperationException ( "We can't programmatically
- // delete Kafka topics yet." );
}
- //private final rrNvReadable fSettings;
private final ZkClient fZk;
private final ConfigDb fCambriaConfig;
private final ConfigPath fBaseTopicData;
@@ -486,7 +474,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public Set<String> getOwners() {
- final TreeSet<String> owners = new TreeSet<String> ();
+ final TreeSet<String> owners = new TreeSet<>();
owners.add ( fOwner );
return owners;
}