diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java | 60 |
1 files changed, 21 insertions, 39 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 4bdd9f3..5f616c7 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -26,21 +26,19 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Properties; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONException; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.StringUtils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import kafka.common.FailedToSendMessageException; @@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher { */ @Override public void sendMessage(String topic, message msg) throws IOException{ - final List<message> msgs = new LinkedList<message>(); + final List<message> msgs = new LinkedList<>(); msgs.add(msg); sendMessages(topic, msgs); } @@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher { } } */ @Override - public void sendMessagesNew(String topic, List<? extends message> msgs) - throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); -try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); - for (message o : msgs) { - - final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); - - - try { - - fProducer.send(data); - - } catch (Exception excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new Exception(excp.getMessage(), excp); - } - } - - }catch(Exception e){} -} - //private final rrNvReadable fSettings; + public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + try { + for (message o : msgs) { + final ProducerRecord<String, String> data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); + } + } private Producer<String, String> fProducer; @@ -203,14 +190,11 @@ try{ * @param defVal */ private void transferSetting(Properties props, String key, String defVal) { - String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal; - //props.put(key, settings.getString("kafka." + key, defVal)); - props.put(key, kafka_prop); + String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); + if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; + props.put(key, kafkaProp); } - //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); @Override @@ -218,6 +202,4 @@ try{ // TODO Auto-generated method stub } - - -}
\ No newline at end of file +} |