diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 18 |
1 files changed, 3 insertions, 15 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index a4ae2be..1e20ee2 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -39,16 +39,13 @@ import org.apache.kafka.common.KafkaFuture; import org.json.JSONObject; import org.json.JSONArray; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; -import org.onap.dmaap.dmf.mr.metabroker.Broker; import org.onap.dmaap.dmf.mr.metabroker.Broker1; import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -94,7 +91,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } - //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); private final AdminClient fKafkaAdminClient; @@ -109,7 +105,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { - //fSettings = settings; fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); @@ -149,7 +144,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public List<Topic> getAllTopics() throws ConfigDbException { log.info("Retrieving list of all the topics."); - final LinkedList<Topic> result = new LinkedList<Topic>(); + final LinkedList<Topic> result = new LinkedList<>(); try { log.info("Retrieving all topics from root: " + zkTopicsRoot); final List<String> topics = fZk.getChildren(zkTopicsRoot); @@ -229,7 +224,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { try { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () ); + final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); final KafkaFuture<Void> ctrResult = ctr.all (); ctrResult.get (); @@ -254,7 +249,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { log.info("Deleting topic: " + topic); - ZkClient zkClient = null; try { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created @@ -269,15 +263,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { throw new ConfigDbException(e); } finally { log.info("Closing zookeeper connection."); - if (zkClient != null) - zkClient.close(); } - - // throw new UnsupportedOperationException ( "We can't programmatically - // delete Kafka topics yet." ); } - //private final rrNvReadable fSettings; private final ZkClient fZk; private final ConfigDb fCambriaConfig; private final ConfigPath fBaseTopicData; @@ -486,7 +474,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public Set<String> getOwners() { - final TreeSet<String> owners = new TreeSet<String> (); + final TreeSet<String> owners = new TreeSet<>(); owners.add ( fOwner ); return owners; } |