diff options
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java | 60 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 55 |
2 files changed, 47 insertions, 68 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 4bdd9f3..5f616c7 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -26,21 +26,19 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Properties; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONException; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.StringUtils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import kafka.common.FailedToSendMessageException; @@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher { */ @Override public void sendMessage(String topic, message msg) throws IOException{ - final List<message> msgs = new LinkedList<message>(); + final List<message> msgs = new LinkedList<>(); msgs.add(msg); sendMessages(topic, msgs); } @@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher { } } */ @Override - public void sendMessagesNew(String topic, List<? extends message> msgs) - throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); -try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); - for (message o : msgs) { - - final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); - - - try { - - fProducer.send(data); - - } catch (Exception excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new Exception(excp.getMessage(), excp); - } - } - - }catch(Exception e){} -} - //private final rrNvReadable fSettings; + public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + try { + for (message o : msgs) { + final ProducerRecord<String, String> data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); + } + } private Producer<String, String> fProducer; @@ -203,14 +190,11 @@ try{ * @param defVal */ private void transferSetting(Properties props, String key, String defVal) { - String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal; - //props.put(key, settings.getString("kafka." + key, defVal)); - props.put(key, kafka_prop); + String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); + if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; + props.put(key, kafkaProp); } - //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); @Override @@ -218,6 +202,4 @@ try{ // TODO Auto-generated method stub } - - -}
\ No newline at end of file +} diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 03a1bd5..d7fa28b 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } // create via kafka - - try - { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); - final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); - final KafkaFuture<Void> ctrResult = ctr.all (); - ctrResult.get (); - // underlying Kafka topic created. now setup our API info - return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); - } - catch ( InterruptedException e ) - { - - log.warn ( "Execution of describeTopics timed out." ); - throw new ConfigDbException ( e ); - } - catch ( ExecutionException e ) - { - - log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); - throw new ConfigDbException ( e.getCause () ); - } + + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture<Void> ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } } @@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // owner (or it's empty), null is okay -- this is for existing or implicitly // created topics. JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } fReaders = fromJson ( readers ); JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } fWriters = fromJson ( writers ); } @@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); } - catch ( ConfigDbException x ) - { - throw x; - } - catch ( AccessDeniedException x ) + catch ( ConfigDbException | AccessDeniedException x ) { throw x; } |