summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java37
1 files changed, 13 insertions, 24 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index 30209f0..735e372 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import kafka.FailedToSendMessageException;
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
+
/**
* Sends raw JSON objects into Kafka.
@@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher {
kafkaConnUrl="localhost:9092";
}
- //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
- // props.put("bootstrap.servers", bootSever);
- //System.setProperty("java.security.auth.login.config",jaaspath);
+
- /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
- transferSetting( props, "sasl.mechanism", "PLAIN");*/
+
+
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
- //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+
transferSetting( props, "request.required.acks", "1");
transferSetting( props, "message.send.max.retries", "5");
transferSetting(props, "retry.backoff.ms", "150");
- //props.put("serializer.class", "kafka.serializer.StringEncoder");
+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //fConfig = new ProducerConfig(props);
- //fProducer = new Producer<String, String>(fConfig);
+
+
fProducer = new KafkaProducer<>(props);
}
@@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher {
throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
for (message o : msgs) {
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
- //kms.add(data);
+ final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
+
try {
@@ -200,7 +193,7 @@ try{
}
//private final rrNvReadable fSettings;
- //private ProducerConfig fConfig;
+
private Producer<String, String> fProducer;
/**
@@ -227,9 +220,5 @@ try{
}
- //@Override
- //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- // TODO Auto-generated method stub
-
- //}
+
} \ No newline at end of file