diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-03-22 15:45:45 +0000 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-03-23 12:40:33 +0000 |
commit | e6afd43dbc9d4f5979bbc9dc1309d826aa8cf58d (patch) | |
tree | 5fd1585fa643e9acccf5fc1c453605308a06e761 /src/main | |
parent | c5a65e41585490b9fa2b2427ecee15a2b66f11f6 (diff) |
[DMAAP-MR] Get topics from kafka option1.4.0
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I8e21d249517f67ef2cbfe5511178e38b357f3d29
Issue-ID: DMAAP-1727
Diffstat (limited to 'src/main')
3 files changed, 171 insertions, 136 deletions
diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy index 0841a3a..e09c538 100644 --- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy +++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy @@ -3,20 +3,20 @@ xmlns jaxrs: "http://cxf.apache.org/jaxrs" xmlns util: "http://www.springframework.org/schema/util" - echoService(org.onap.dmaap.JaxrsEchoService) - userService(org.onap.dmaap.JaxrsUserService) +// echoService(org.onap.dmaap.JaxrsEchoService) +// userService(org.onap.dmaap.JaxrsUserService) topicService(org.onap.dmaap.service.TopicRestService) eventService(org.onap.dmaap.service.EventsRestService) - adminService(org.onap.dmaap.service.AdminRestService) +// adminService(org.onap.dmaap.service.AdminRestService) apiKeyService(org.onap.dmaap.service.ApiKeysRestService) metricsService(org.onap.dmaap.service.MetricsRestService) - transactionService(org.onap.dmaap.service.TransactionRestService) - UIService(org.onap.dmaap.service.UIRestServices) - mirrorService(org.onap.dmaap.service.MMRestService) +// transactionService(org.onap.dmaap.service.TransactionRestService) +// UIService(org.onap.dmaap.service.UIRestServices) +// mirrorService(org.onap.dmaap.service.MMRestService) - util.list(id: 'jaxrsServices') { - ref(bean:'echoService') - ref(bean:'userService') - - } +// util.list(id: 'jaxrsServices') { +// ref(bean:'echoService') +// ref(bean:'userService') +// +// } }
\ No newline at end of file diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml index 090a76b..20e4ceb 100644 --- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml +++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml @@ -5,45 +5,68 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> - + <!-- Dependency Injection with annotations --> <!-- <context:property-placeholder location="file:/C:/Users/su622b/Desktop/testonap.properties"/> --> - <!-- <context:property-placeholder - location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> --> - - <context:component-scan - base-package="org.onap.dmaap,org.onap.dmaap.filemonitor,org.onap.dmaap.mmagent,org.onap.dmaap.service,org.onap.dmaap.tools,org.onap.dmaap.util,org.onap.dmaap.filter,org.onap.dmaap.apiServer.metrics.cambria, - org.onap.dmaap.dmf.mr,org.onap.dmaap.dmf.mr.backends,org.onap.dmaap.dmf.mr.backends.kafka,org.onap.dmaap.dmf.mr.backends.memory,org.onap.dmaap.dmf.mr.beans,org.onap.dmaap.dmf.mr.constants,org.onap.dmaap.dmf.mr.exception, - org.onap.dmaap.dmf.mr.listener,org.onap.dmaap.dmf.mr.metabroker,org.onap.dmaap.dmf.mr.metrics.publisher,org.onap.dmaap.dmf.mr.metrics.publisher.impl,org.onap.dmaap.dmf.mr.resources,org.onap.dmaap.dmf.mr.resources.streamReaders,org.onap.dmaap.dmf.mr.security, - org.onap.dmaap.dmf.mr.security.impl,org.onap.dmaap.dmf.mr.service,org.onap.dmaap.dmf.mr.service.impl,org.onap.dmaap.dmf.mr.transaction,org.onap.dmaap.dmf.mr.transaction.impl,org.onap.dmaap.dmf.mr.utils, - com.att,com.att.dmf.mr.utils, com.att.dmf.mr, com.att.dmf.mr.rest,com.att.dmf.mr.service, - com.att.dmf.mr.service.impl,com.att.dmf.mr.beans,com.att.dmf.mr.security,com.att.dmf.mr.exception,com.att.dmf.mr.backends,com.att.dmf.mr.backends.kafka, - com.att.dmf.mr.transaction,com.att.dmf.mr.exception,com.att.nsa.dmaap,com.att.nsa.dmaap.service,com.att.nsa.dmaap.util,java.lang,java.util,com.att.dmf.mr.exception, com.att.dmf,com.att.nsa.dmaap.mmagent" /> - <context:property-placeholder - location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/> - - <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider"> - <property name="dropRootElement" value="true" /> - <property name="supportUnwrapped" value="true" /> - </bean> - - <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" /> - - <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" /> - - <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" /> - - + <!-- <context:property-placeholder + location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> --> + + <context:component-scan + base-package=" + org.onap.dmaap,org.onap.dmaap.filemonitor, + org.onap.dmaap.mmagent, + org.onap.dmaap.service, + org.onap.dmaap.tools, + org.onap.dmaap.util, + org.onap.dmaap.filter, + org.onap.dmaap.apiServer.metrics.cambria, + org.onap.dmaap.dmf.mr, + org.onap.dmaap.dmf.mr.backends, + org.onap.dmaap.dmf.mr.backends.kafka, + org.onap.dmaap.dmf.mr.backends.memory, + org.onap.dmaap.dmf.mr.beans, + org.onap.dmaap.dmf.mr.constants, + org.onap.dmaap.dmf.mr.exception, + org.onap.dmaap.dmf.mr.listener, + org.onap.dmaap.dmf.mr.metabroker, + org.onap.dmaap.dmf.mr.metrics.publisher, + org.onap.dmaap.dmf.mr.metrics.publisher.impl, + org.onap.dmaap.dmf.mr.resources, + org.onap.dmaap.dmf.mr.resources.streamReaders, + org.onap.dmaap.dmf.mr.security, + org.onap.dmaap.dmf.mr.security.impl, + org.onap.dmaap.dmf.mr.service, + org.onap.dmaap.dmf.mr.service.impl, + org.onap.dmaap.dmf.mr.transaction, + org.onap.dmaap.dmf.mr.transaction.impl, + org.onap.dmaap.dmf.mr.utils, + java.lang, + java.util" /> + <context:property-placeholder + location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/> + + <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider"> + <property name="dropRootElement" value="true" /> + <property name="supportUnwrapped" value="true" /> + </bean> + + <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" /> + + <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" /> + + <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" /> + + <!-- Your bean definitions goes here --> -<!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> --> -<!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> --> - <bean id="servicePropsBean" name="servicePropsBean" + <!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> --> + <!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> --> + <bean id="servicePropsBean" name="servicePropsBean" class="org.onap.dmaap.util.ServicePropertiesMapBean" /> - - <!-- Msgrtr beans --> - <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" /> - <bean + + <!-- Msgrtr beans --> + <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" /> + <bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <!-- Next value is the full qualified name of the static setter including method name --> @@ -60,7 +83,7 @@ class="com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter" /> <bean id="dMaaPMetricsSet" class="org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet"> - <constructor-arg ref="propertyReader" /> + <constructor-arg ref="propertyReader" /> </bean> <bean id="dMaaPZkClient" class=" org.onap.dmaap.dmf.mr.beans.DMaaPZkClient"> @@ -71,7 +94,7 @@ <constructor-arg ref="dMaaPZkClient" /> <constructor-arg ref="propertyReader" /> </bean> - + <bean id="kafkaPublisher" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher"> <constructor-arg ref="propertyReader" /> @@ -82,13 +105,13 @@ <constructor-arg ref="dMaaPMetricsSet" /> <constructor-arg ref="kafkalockavoid" /> </bean> --> - - <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory"> + + <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory"> <constructor-arg ref="dMaaPMetricsSet" /> <constructor-arg ref="curator" /> <constructor-arg ref="kafkalockavoid" /> </bean> - + <bean id="curator" class="org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory" factory-method="getCurator"> @@ -125,9 +148,9 @@ <bean id="defLength" class="org.onap.dmaap.mr.filter.DefaultLength"> <property name="defaultLength" value="${maxcontentlength}"></property> </bean> - - <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" /> - - <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/> + <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" /> + + + <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/> </beans> diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 7a08345..ae7414e 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.json.JSONArray; @@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.metabroker.Broker1; import org.onap.dmaap.dmf.mr.metabroker.Topic; -import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.Utils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.StringUtils; @@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException; //@Component public class DMaaPKafkaMetaBroker implements Broker1 { + private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class); + private final AdminClient fKafkaAdminClient; + private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv(). + getOrDefault("useZkTopicStore", "true")); + private final ZkClient fZk; + private final ConfigDb fCambriaConfig; + private final ConfigPath fBaseTopicData; + private static final String ZK_TOPICS_ROOT = "/brokers/topics"; + private static final JSONObject kEmptyAcl = new JSONObject(); + public DMaaPKafkaMetaBroker() { fZk = null; fCambriaConfig = null; fBaseTopicData = null; final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (StringUtils.isEmpty(fkafkaBrokers)) { + "kafka.metadata.broker.list"); + if (StringUtils.isEmpty(fkafkaBrokers)) { fkafkaBrokers = "localhost:9092"; } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } fKafkaAdminClient=AdminClient.create ( props ); - } - private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - private final AdminClient fKafkaAdminClient; - - - /** * DMaaPKafkaMetaBroker constructor initializing * @@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param configDb */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (null == fkafkaBrokers) { + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { fkafkaBrokers = "localhost:9092"; } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - fKafkaAdminClient=AdminClient.create ( props ); - - - } - public DMaaPKafkaMetaBroker( rrNvReadable settings, - ZkClient zk, ConfigDb configDb,AdminClient client) { - + public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - - - } @Override public List<Topic> getAllTopics() throws ConfigDbException { log.info("Retrieving list of all the topics."); - final LinkedList<Topic> result = new LinkedList<>(); + if (!GET_TOPICS_FROM_ZK) { + return getTopicsFromKafka(); + } + return getTopicsFromZookeeper(); + } + + private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException { + LinkedList<Topic> res = new LinkedList<>(); + final ListTopicsResult ltr = fKafkaAdminClient.listTopics(); try { - log.info("Retrieving all topics from root: " + zkTopicsRoot); - final List<String> topics = fZk.getChildren(zkTopicsRoot); + for (String name: ltr.names().get()) { + res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData)); + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e); + } + return res; + } + + private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException { + final LinkedList<Topic> legacyResult = new LinkedList<>(); + try { + log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT); + final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT); for (String topic : topics) { - result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); + legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); } JSONObject dataObj = new JSONObject(); dataObj.put("topics", new JSONObject()); @@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } catch (ZkNoNodeException excp) { // very fresh kafka doesn't have any topics or a topics node - log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); + log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp); } - return result; + return legacyResult; } @Override public Topic getTopic(String topic) throws ConfigDbException { - if (fZk.exists(zkTopicsRoot + "/" + topic)) { + if (!GET_TOPICS_FROM_ZK) { + try { + for (String name : fKafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + log.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); + } + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return null; + } + } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) { return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); } - // else: no such topic in kafka + // else: no such topic return null; } @@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { - log.info("Creating topic: " + topic); + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + log.info("Creating topic: {}", topic); try { - log.info("Check if topic [" + topic + "] exist."); + log.info("Check if topic [{}] exist.", topic); // first check for existence "our way" final Topic t = getTopic(topic); if (t != null) { - log.info("Could not create topic [" + topic + "]. Topic Already exists."); + log.info("Could not create topic [{}]. Topic Already exists.", topic); throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); } } catch (ConfigDbException e1) { log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Couldn't check topic data in config db."); + "Couldn't check topic data in config db."); } // we only allow 3 replicas. (If we don't test this, we get weird // results from the cluster, // so explicit test and fail.) if (replicas < 1 || replicas > 3) { - log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); + log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, - "The replica count must be between 1 and 3."); + "The replica count must be between 1 and 3."); } if (partitions < 1) { - log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); + log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); } - // create via kafka - try { - final NewTopic topicRequest = - new NewTopic(topic, partitions, (short)replicas); - final CreateTopicsResult ctr = - fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); final KafkaFuture<Void> ctrResult = ctr.all(); ctrResult.get(); // underlying Kafka topic created. now setup our API info @@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { - log.info("Deleting topic: " + topic); + log.info("Deleting topic: {}", topic); try { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - - fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); @@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } - private final ZkClient fZk; - private final ConfigDb fCambriaConfig; - private final ConfigPath fBaseTopicData; - - private static final String zkTopicsRoot = "/brokers/topics"; - private static final JSONObject kEmptyAcl = new JSONObject(); - /** * method Providing KafkaTopic Object associated with owner and * transactionenabled or not @@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) - throws ConfigDbException { + throws ConfigDbException { return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); } @@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { + boolean transactionEnabled) throws ConfigDbException { final JSONObject o = new JSONObject(); o.put("owner", owner); o.put("description", desc); o.put("txenabled", transactionEnabled); - db.store(basePath.getChild(name), o.toString()); + if (GET_TOPICS_FROM_ZK) { + db.store(basePath.getChild(name), o.toString()); + } return new KafkaTopic(name, db, basePath); } /** - * class performing all user opearation like user is eligible to read, - * write. permitting a user to write and read, + * class performing all user operation like user is eligible to read, + * write. permitting a user to write and read etc * * @author anowarul.islam * @@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param baseTopic * @throws ConfigDbException */ + + private final String fName; + private final ConfigDb fConfigDb; + private final ConfigPath fBaseTopicData; + private final String fOwner; + private final String fDesc; + private final NsaAcl fReaders; + private final NsaAcl fWriters; + private final boolean fTransactionEnabled; + public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { fName = name; fConfigDb = configdb; @@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitWritesFromUser(String pubId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, false, true, pubId); } @@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, true, consumerId); } @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, false, consumerId); } private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) - throws ConfigDbException, AccessDeniedException{ - try - { + throws ConfigDbException, AccessDeniedException{ + try { final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - // we have to assume we have current data, or load it again. for the expected use // case, assuming we can overwrite the data is fine. final JSONObject o = new JSONObject (); @@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - - } - catch ( ConfigDbException | AccessDeniedException x ) - { + } catch ( ConfigDbException | AccessDeniedException x ) { + log.info("Error when trying to update acl for key {}", key); throw x; } @@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { return acl == null ? null : acl.serialize(); } - private final String fName; - private final ConfigDb fConfigDb; - private final ConfigPath fBaseTopicData; - private final String fOwner; - private final String fDesc; - private final NsaAcl fReaders; - private final NsaAcl fWriters; - private boolean fTransactionEnabled; - public boolean isTransactionEnabled() { return fTransactionEnabled; } |