diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 03a1bd5..d7fa28b 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } // create via kafka - - try - { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); - final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); - final KafkaFuture<Void> ctrResult = ctr.all (); - ctrResult.get (); - // underlying Kafka topic created. now setup our API info - return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); - } - catch ( InterruptedException e ) - { - - log.warn ( "Execution of describeTopics timed out." ); - throw new ConfigDbException ( e ); - } - catch ( ExecutionException e ) - { - - log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); - throw new ConfigDbException ( e.getCause () ); - } + + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture<Void> ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } } @@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // owner (or it's empty), null is okay -- this is for existing or implicitly // created topics. JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } fReaders = fromJson ( readers ); JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } fWriters = fromJson ( writers ); } @@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); } - catch ( ConfigDbException x ) - { - throw x; - } - catch ( AccessDeniedException x ) + catch ( ConfigDbException | AccessDeniedException x ) { throw x; } |