summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java60
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java55
2 files changed, 47 insertions, 68 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
index 4bdd9f3..5f616c7 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -26,21 +26,19 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.util.StringUtils;
import org.onap.dmaap.dmf.mr.backends.Publisher;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.util.StringUtils;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import kafka.common.FailedToSendMessageException;
@@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher {
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
@@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher {
*/
@Override
public void sendMessage(String topic, message msg) throws IOException{
- final List<message> msgs = new LinkedList<message>();
+ final List<message> msgs = new LinkedList<>();
msgs.add(msg);
sendMessages(topic, msgs);
}
@@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher {
}
} */
@Override
- public void sendMessagesNew(String topic, List<? extends message> msgs)
- throws IOException {
- log.info("sending " + msgs.size() + " events to [" + topic + "]");
-try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
- for (message o : msgs) {
-
- final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
-
-
- try {
-
- fProducer.send(data);
-
- } catch (Exception excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new Exception(excp.getMessage(), excp);
- }
- }
-
- }catch(Exception e){}
-}
- //private final rrNvReadable fSettings;
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+ try {
+ for (message o : msgs) {
+ final ProducerRecord<String, String> data =
+ new ProducerRecord<>(topic, o.getKey(), o.toString());
+ fProducer.send(data);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", e);
+ }
+ }
private Producer<String, String> fProducer;
@@ -203,14 +190,11 @@ try{
* @param defVal
*/
private void transferSetting(Properties props, String key, String defVal) {
- String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
- if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal;
- //props.put(key, settings.getString("kafka." + key, defVal));
- props.put(key, kafka_prop);
+ String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
+ if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
+ props.put(key, kafkaProp);
}
- //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
-
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
@Override
@@ -218,6 +202,4 @@ try{
// TODO Auto-generated method stub
}
-
-
-} \ No newline at end of file
+}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 03a1bd5..d7fa28b 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
// create via kafka
-
- try
- {
- final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
- final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
- final KafkaFuture<Void> ctrResult = ctr.all ();
- ctrResult.get ();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
- }
- catch ( InterruptedException e )
- {
-
- log.warn ( "Execution of describeTopics timed out." );
- throw new ConfigDbException ( e );
- }
- catch ( ExecutionException e )
- {
-
- log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
- throw new ConfigDbException ( e.getCause () );
- }
+
+ try {
+ final NewTopic topicRequest =
+ new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr =
+ fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final KafkaFuture<Void> ctrResult = ctr.all();
+ ctrResult.get();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (InterruptedException e) {
+ log.warn("Execution of describeTopics timed out.");
+ throw new ConfigDbException(e);
+ } catch (ExecutionException e) {
+ log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+ throw new ConfigDbException(e.getCause());
+ }
}
@@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
// owner (or it's empty), null is okay -- this is for existing or implicitly
// created topics.
JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+ if ( readers == null && fOwner.length () > 0 )
+ {
+ readers = kEmptyAcl;
+ }
fReaders = fromJson ( readers );
JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+ if ( writers == null && fOwner.length () > 0 )
+ {
+ writers = kEmptyAcl;
+ }
fWriters = fromJson ( writers );
}
@@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
}
- catch ( ConfigDbException x )
- {
- throw x;
- }
- catch ( AccessDeniedException x )
+ catch ( ConfigDbException | AccessDeniedException x )
{
throw x;
}