summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2022-03-22 15:45:45 +0000
committerefiacor <fiachra.corcoran@est.tech>2022-03-23 12:40:33 +0000
commite6afd43dbc9d4f5979bbc9dc1309d826aa8cf58d (patch)
tree5fd1585fa643e9acccf5fc1c453605308a06e761 /src/main
parentc5a65e41585490b9fa2b2427ecee15a2b66f11f6 (diff)
[DMAAP-MR] Get topics from kafka option1.4.0
Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: I8e21d249517f67ef2cbfe5511178e38b357f3d29 Issue-ID: DMAAP-1727
Diffstat (limited to 'src/main')
-rw-r--r--src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy22
-rw-r--r--src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml109
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java176
3 files changed, 171 insertions, 136 deletions
diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
index 0841a3a..e09c538 100644
--- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
+++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
@@ -3,20 +3,20 @@
xmlns jaxrs: "http://cxf.apache.org/jaxrs"
xmlns util: "http://www.springframework.org/schema/util"
- echoService(org.onap.dmaap.JaxrsEchoService)
- userService(org.onap.dmaap.JaxrsUserService)
+// echoService(org.onap.dmaap.JaxrsEchoService)
+// userService(org.onap.dmaap.JaxrsUserService)
topicService(org.onap.dmaap.service.TopicRestService)
eventService(org.onap.dmaap.service.EventsRestService)
- adminService(org.onap.dmaap.service.AdminRestService)
+// adminService(org.onap.dmaap.service.AdminRestService)
apiKeyService(org.onap.dmaap.service.ApiKeysRestService)
metricsService(org.onap.dmaap.service.MetricsRestService)
- transactionService(org.onap.dmaap.service.TransactionRestService)
- UIService(org.onap.dmaap.service.UIRestServices)
- mirrorService(org.onap.dmaap.service.MMRestService)
+// transactionService(org.onap.dmaap.service.TransactionRestService)
+// UIService(org.onap.dmaap.service.UIRestServices)
+// mirrorService(org.onap.dmaap.service.MMRestService)
- util.list(id: 'jaxrsServices') {
- ref(bean:'echoService')
- ref(bean:'userService')
-
- }
+// util.list(id: 'jaxrsServices') {
+// ref(bean:'echoService')
+// ref(bean:'userService')
+//
+// }
} \ No newline at end of file
diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
index 090a76b..20e4ceb 100644
--- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
+++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
@@ -5,45 +5,68 @@
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
-
+
<!-- Dependency Injection with annotations -->
<!-- <context:property-placeholder
location="file:/C:/Users/su622b/Desktop/testonap.properties"/> -->
- <!-- <context:property-placeholder
- location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
-
- <context:component-scan
- base-package="org.onap.dmaap,org.onap.dmaap.filemonitor,org.onap.dmaap.mmagent,org.onap.dmaap.service,org.onap.dmaap.tools,org.onap.dmaap.util,org.onap.dmaap.filter,org.onap.dmaap.apiServer.metrics.cambria,
- org.onap.dmaap.dmf.mr,org.onap.dmaap.dmf.mr.backends,org.onap.dmaap.dmf.mr.backends.kafka,org.onap.dmaap.dmf.mr.backends.memory,org.onap.dmaap.dmf.mr.beans,org.onap.dmaap.dmf.mr.constants,org.onap.dmaap.dmf.mr.exception,
- org.onap.dmaap.dmf.mr.listener,org.onap.dmaap.dmf.mr.metabroker,org.onap.dmaap.dmf.mr.metrics.publisher,org.onap.dmaap.dmf.mr.metrics.publisher.impl,org.onap.dmaap.dmf.mr.resources,org.onap.dmaap.dmf.mr.resources.streamReaders,org.onap.dmaap.dmf.mr.security,
- org.onap.dmaap.dmf.mr.security.impl,org.onap.dmaap.dmf.mr.service,org.onap.dmaap.dmf.mr.service.impl,org.onap.dmaap.dmf.mr.transaction,org.onap.dmaap.dmf.mr.transaction.impl,org.onap.dmaap.dmf.mr.utils,
- com.att,com.att.dmf.mr.utils, com.att.dmf.mr, com.att.dmf.mr.rest,com.att.dmf.mr.service,
- com.att.dmf.mr.service.impl,com.att.dmf.mr.beans,com.att.dmf.mr.security,com.att.dmf.mr.exception,com.att.dmf.mr.backends,com.att.dmf.mr.backends.kafka,
- com.att.dmf.mr.transaction,com.att.dmf.mr.exception,com.att.nsa.dmaap,com.att.nsa.dmaap.service,com.att.nsa.dmaap.util,java.lang,java.util,com.att.dmf.mr.exception, com.att.dmf,com.att.nsa.dmaap.mmagent" />
- <context:property-placeholder
- location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/>
-
- <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
- <property name="dropRootElement" value="true" />
- <property name="supportUnwrapped" value="true" />
- </bean>
-
- <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
-
- <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
-
- <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
-
-
+ <!-- <context:property-placeholder
+ location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
+
+ <context:component-scan
+ base-package="
+ org.onap.dmaap,org.onap.dmaap.filemonitor,
+ org.onap.dmaap.mmagent,
+ org.onap.dmaap.service,
+ org.onap.dmaap.tools,
+ org.onap.dmaap.util,
+ org.onap.dmaap.filter,
+ org.onap.dmaap.apiServer.metrics.cambria,
+ org.onap.dmaap.dmf.mr,
+ org.onap.dmaap.dmf.mr.backends,
+ org.onap.dmaap.dmf.mr.backends.kafka,
+ org.onap.dmaap.dmf.mr.backends.memory,
+ org.onap.dmaap.dmf.mr.beans,
+ org.onap.dmaap.dmf.mr.constants,
+ org.onap.dmaap.dmf.mr.exception,
+ org.onap.dmaap.dmf.mr.listener,
+ org.onap.dmaap.dmf.mr.metabroker,
+ org.onap.dmaap.dmf.mr.metrics.publisher,
+ org.onap.dmaap.dmf.mr.metrics.publisher.impl,
+ org.onap.dmaap.dmf.mr.resources,
+ org.onap.dmaap.dmf.mr.resources.streamReaders,
+ org.onap.dmaap.dmf.mr.security,
+ org.onap.dmaap.dmf.mr.security.impl,
+ org.onap.dmaap.dmf.mr.service,
+ org.onap.dmaap.dmf.mr.service.impl,
+ org.onap.dmaap.dmf.mr.transaction,
+ org.onap.dmaap.dmf.mr.transaction.impl,
+ org.onap.dmaap.dmf.mr.utils,
+ java.lang,
+ java.util" />
+ <context:property-placeholder
+ location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/>
+
+ <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
+ <property name="dropRootElement" value="true" />
+ <property name="supportUnwrapped" value="true" />
+ </bean>
+
+ <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
+
+ <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
+
+ <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
+
+
<!-- Your bean definitions goes here -->
-<!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
-<!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
- <bean id="servicePropsBean" name="servicePropsBean"
+ <!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
+ <!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
+ <bean id="servicePropsBean" name="servicePropsBean"
class="org.onap.dmaap.util.ServicePropertiesMapBean" />
-
- <!-- Msgrtr beans -->
- <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
- <bean
+
+ <!-- Msgrtr beans -->
+ <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
+ <bean
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<!-- Next value is the full qualified name of the static setter including
method name -->
@@ -60,7 +83,7 @@
class="com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter" />
<bean id="dMaaPMetricsSet" class="org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet">
- <constructor-arg ref="propertyReader" />
+ <constructor-arg ref="propertyReader" />
</bean>
<bean id="dMaaPZkClient" class=" org.onap.dmaap.dmf.mr.beans.DMaaPZkClient">
@@ -71,7 +94,7 @@
<constructor-arg ref="dMaaPZkClient" />
<constructor-arg ref="propertyReader" />
</bean>
-
+
<bean id="kafkaPublisher" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher">
<constructor-arg ref="propertyReader" />
@@ -82,13 +105,13 @@
<constructor-arg ref="dMaaPMetricsSet" />
<constructor-arg ref="kafkalockavoid" />
</bean> -->
-
- <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
+
+ <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
<constructor-arg ref="dMaaPMetricsSet" />
<constructor-arg ref="curator" />
<constructor-arg ref="kafkalockavoid" />
</bean>
-
+
<bean id="curator" class="org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory"
factory-method="getCurator">
@@ -125,9 +148,9 @@
<bean id="defLength" class="org.onap.dmaap.mr.filter.DefaultLength">
<property name="defaultLength" value="${maxcontentlength}"></property>
</bean>
-
- <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" />
-
- <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
+ <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" />
+
+
+ <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
</beans>
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 7a08345..ae7414e 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.json.JSONArray;
@@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.metabroker.Broker1;
import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
import org.onap.dmaap.dmf.mr.utils.Utils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.StringUtils;
@@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException;
//@Component
public class DMaaPKafkaMetaBroker implements Broker1 {
+ private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
+ private final AdminClient fKafkaAdminClient;
+ private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
+ getOrDefault("useZkTopicStore", "true"));
+ private final ZkClient fZk;
+ private final ConfigDb fCambriaConfig;
+ private final ConfigPath fBaseTopicData;
+ private static final String ZK_TOPICS_ROOT = "/brokers/topics";
+ private static final JSONObject kEmptyAcl = new JSONObject();
+
public DMaaPKafkaMetaBroker() {
fZk = null;
fCambriaConfig = null;
fBaseTopicData = null;
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (StringUtils.isEmpty(fkafkaBrokers)) {
+ "kafka.metadata.broker.list");
+ if (StringUtils.isEmpty(fkafkaBrokers)) {
fkafkaBrokers = "localhost:9092";
}
-
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
fKafkaAdminClient=AdminClient.create ( props );
-
}
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- private final AdminClient fKafkaAdminClient;
-
-
-
/**
* DMaaPKafkaMetaBroker constructor initializing
*
@@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
fkafkaBrokers = "localhost:9092";
}
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
fKafkaAdminClient=AdminClient.create ( props );
-
-
-
}
- public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
-
-
-
}
@Override
public List<Topic> getAllTopics() throws ConfigDbException {
log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<>();
+ if (!GET_TOPICS_FROM_ZK) {
+ return getTopicsFromKafka();
+ }
+ return getTopicsFromZookeeper();
+ }
+
+ private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
+ LinkedList<Topic> res = new LinkedList<>();
+ final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
try {
- log.info("Retrieving all topics from root: " + zkTopicsRoot);
- final List<String> topics = fZk.getChildren(zkTopicsRoot);
+ for (String name: ltr.names().get()) {
+ res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
+ }
+ return res;
+ }
+
+ private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
+ final LinkedList<Topic> legacyResult = new LinkedList<>();
+ try {
+ log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
+ final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
for (String topic : topics) {
- result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+ legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
}
JSONObject dataObj = new JSONObject();
dataObj.put("topics", new JSONObject());
@@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
} catch (ZkNoNodeException excp) {
// very fresh kafka doesn't have any topics or a topics node
- log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+ log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
}
- return result;
+ return legacyResult;
}
@Override
public Topic getTopic(String topic) throws ConfigDbException {
- if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+ if (!GET_TOPICS_FROM_ZK) {
+ try {
+ for (String name : fKafkaAdminClient.listTopics().names().get()) {
+ if (name.equals(topic)) {
+ log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+ return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+ return null;
+ }
+ } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
}
- // else: no such topic in kafka
+ // else: no such topic
return null;
}
@@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
- log.info("Creating topic: " + topic);
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ log.info("Creating topic: {}", topic);
try {
- log.info("Check if topic [" + topic + "] exist.");
+ log.info("Check if topic [{}] exist.", topic);
// first check for existence "our way"
final Topic t = getTopic(topic);
if (t != null) {
- log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+ log.info("Could not create topic [{}]. Topic Already exists.", topic);
throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
}
} catch (ConfigDbException e1) {
log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Couldn't check topic data in config db.");
+ "Couldn't check topic data in config db.");
}
// we only allow 3 replicas. (If we don't test this, we get weird
// results from the cluster,
// so explicit test and fail.)
if (replicas < 1 || replicas > 3) {
- log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+ log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
- "The replica count must be between 1 and 3.");
+ "The replica count must be between 1 and 3.");
}
if (partitions < 1) {
- log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+ log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
}
-
// create via kafka
-
try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
final KafkaFuture<Void> ctrResult = ctr.all();
ctrResult.get();
// underlying Kafka topic created. now setup our API info
@@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
- log.info("Deleting topic: " + topic);
+ log.info("Deleting topic: {}", topic);
try {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
@@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
}
- private final ZkClient fZk;
- private final ConfigDb fCambriaConfig;
- private final ConfigPath fBaseTopicData;
-
- private static final String zkTopicsRoot = "/brokers/topics";
- private static final JSONObject kEmptyAcl = new JSONObject();
-
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
@@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
- throws ConfigDbException {
+ throws ConfigDbException {
return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
}
@@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
o.put("txenabled", transactionEnabled);
- db.store(basePath.getChild(name), o.toString());
+ if (GET_TOPICS_FROM_ZK) {
+ db.store(basePath.getChild(name), o.toString());
+ }
return new KafkaTopic(name, db, basePath);
}
/**
- * class performing all user opearation like user is eligible to read,
- * write. permitting a user to write and read,
+ * class performing all user operation like user is eligible to read,
+ * write. permitting a user to write and read etc
*
* @author anowarul.islam
*
@@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param baseTopic
* @throws ConfigDbException
*/
+
+ private final String fName;
+ private final ConfigDb fConfigDb;
+ private final ConfigPath fBaseTopicData;
+ private final String fOwner;
+ private final String fDesc;
+ private final NsaAcl fReaders;
+ private final NsaAcl fWriters;
+ private final boolean fTransactionEnabled;
+
public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
fName = name;
fConfigDb = configdb;
@@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitWritesFromUser(String pubId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, false, true, pubId);
}
@@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, true, consumerId);
}
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, false, consumerId);
}
private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
- throws ConfigDbException, AccessDeniedException{
- try
- {
+ throws ConfigDbException, AccessDeniedException{
+ try {
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
@@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
- }
- catch ( ConfigDbException | AccessDeniedException x )
- {
+ } catch ( ConfigDbException | AccessDeniedException x ) {
+ log.info("Error when trying to update acl for key {}", key);
throw x;
}
@@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
return acl == null ? null : acl.serialize();
}
- private final String fName;
- private final ConfigDb fConfigDb;
- private final ConfigPath fBaseTopicData;
- private final String fOwner;
- private final String fDesc;
- private final NsaAcl fReaders;
- private final NsaAcl fWriters;
- private boolean fTransactionEnabled;
-
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}