summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java176
1 files changed, 94 insertions, 82 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 7a08345..ae7414e 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.json.JSONArray;
@@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.metabroker.Broker1;
import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
import org.onap.dmaap.dmf.mr.utils.Utils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.StringUtils;
@@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException;
//@Component
public class DMaaPKafkaMetaBroker implements Broker1 {
+ private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
+ private final AdminClient fKafkaAdminClient;
+ private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
+ getOrDefault("useZkTopicStore", "true"));
+ private final ZkClient fZk;
+ private final ConfigDb fCambriaConfig;
+ private final ConfigPath fBaseTopicData;
+ private static final String ZK_TOPICS_ROOT = "/brokers/topics";
+ private static final JSONObject kEmptyAcl = new JSONObject();
+
public DMaaPKafkaMetaBroker() {
fZk = null;
fCambriaConfig = null;
fBaseTopicData = null;
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (StringUtils.isEmpty(fkafkaBrokers)) {
+ "kafka.metadata.broker.list");
+ if (StringUtils.isEmpty(fkafkaBrokers)) {
fkafkaBrokers = "localhost:9092";
}
-
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
fKafkaAdminClient=AdminClient.create ( props );
-
}
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- private final AdminClient fKafkaAdminClient;
-
-
-
/**
* DMaaPKafkaMetaBroker constructor initializing
*
@@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
fkafkaBrokers = "localhost:9092";
}
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
fKafkaAdminClient=AdminClient.create ( props );
-
-
-
}
- public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
-
-
-
}
@Override
public List<Topic> getAllTopics() throws ConfigDbException {
log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<>();
+ if (!GET_TOPICS_FROM_ZK) {
+ return getTopicsFromKafka();
+ }
+ return getTopicsFromZookeeper();
+ }
+
+ private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
+ LinkedList<Topic> res = new LinkedList<>();
+ final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
try {
- log.info("Retrieving all topics from root: " + zkTopicsRoot);
- final List<String> topics = fZk.getChildren(zkTopicsRoot);
+ for (String name: ltr.names().get()) {
+ res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
+ }
+ return res;
+ }
+
+ private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
+ final LinkedList<Topic> legacyResult = new LinkedList<>();
+ try {
+ log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
+ final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
for (String topic : topics) {
- result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+ legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
}
JSONObject dataObj = new JSONObject();
dataObj.put("topics", new JSONObject());
@@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
} catch (ZkNoNodeException excp) {
// very fresh kafka doesn't have any topics or a topics node
- log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+ log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
}
- return result;
+ return legacyResult;
}
@Override
public Topic getTopic(String topic) throws ConfigDbException {
- if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+ if (!GET_TOPICS_FROM_ZK) {
+ try {
+ for (String name : fKafkaAdminClient.listTopics().names().get()) {
+ if (name.equals(topic)) {
+ log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+ return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+ return null;
+ }
+ } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
}
- // else: no such topic in kafka
+ // else: no such topic
return null;
}
@@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
- log.info("Creating topic: " + topic);
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ log.info("Creating topic: {}", topic);
try {
- log.info("Check if topic [" + topic + "] exist.");
+ log.info("Check if topic [{}] exist.", topic);
// first check for existence "our way"
final Topic t = getTopic(topic);
if (t != null) {
- log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+ log.info("Could not create topic [{}]. Topic Already exists.", topic);
throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
}
} catch (ConfigDbException e1) {
log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Couldn't check topic data in config db.");
+ "Couldn't check topic data in config db.");
}
// we only allow 3 replicas. (If we don't test this, we get weird
// results from the cluster,
// so explicit test and fail.)
if (replicas < 1 || replicas > 3) {
- log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+ log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
- "The replica count must be between 1 and 3.");
+ "The replica count must be between 1 and 3.");
}
if (partitions < 1) {
- log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+ log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
}
-
// create via kafka
-
try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
final KafkaFuture<Void> ctrResult = ctr.all();
ctrResult.get();
// underlying Kafka topic created. now setup our API info
@@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
- log.info("Deleting topic: " + topic);
+ log.info("Deleting topic: {}", topic);
try {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
@@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
}
- private final ZkClient fZk;
- private final ConfigDb fCambriaConfig;
- private final ConfigPath fBaseTopicData;
-
- private static final String zkTopicsRoot = "/brokers/topics";
- private static final JSONObject kEmptyAcl = new JSONObject();
-
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
@@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
- throws ConfigDbException {
+ throws ConfigDbException {
return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
}
@@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
o.put("txenabled", transactionEnabled);
- db.store(basePath.getChild(name), o.toString());
+ if (GET_TOPICS_FROM_ZK) {
+ db.store(basePath.getChild(name), o.toString());
+ }
return new KafkaTopic(name, db, basePath);
}
/**
- * class performing all user opearation like user is eligible to read,
- * write. permitting a user to write and read,
+ * class performing all user operation like user is eligible to read,
+ * write. permitting a user to write and read etc
*
* @author anowarul.islam
*
@@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param baseTopic
* @throws ConfigDbException
*/
+
+ private final String fName;
+ private final ConfigDb fConfigDb;
+ private final ConfigPath fBaseTopicData;
+ private final String fOwner;
+ private final String fDesc;
+ private final NsaAcl fReaders;
+ private final NsaAcl fWriters;
+ private final boolean fTransactionEnabled;
+
public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
fName = name;
fConfigDb = configdb;
@@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitWritesFromUser(String pubId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, false, true, pubId);
}
@@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, true, consumerId);
}
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, false, consumerId);
}
private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
- throws ConfigDbException, AccessDeniedException{
- try
- {
+ throws ConfigDbException, AccessDeniedException{
+ try {
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
@@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
- }
- catch ( ConfigDbException | AccessDeniedException x )
- {
+ } catch ( ConfigDbException | AccessDeniedException x ) {
+ log.info("Error when trying to update acl for key {}", key);
throw x;
}
@@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
return acl == null ? null : acl.serialize();
}
- private final String fName;
- private final ConfigDb fConfigDb;
- private final ConfigPath fBaseTopicData;
- private final String fOwner;
- private final String fDesc;
- private final NsaAcl fReaders;
- private final NsaAcl fWriters;
- private boolean fTransactionEnabled;
-
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}