diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index 30209f0..dc4bcd5 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -41,7 +41,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import kafka.FailedToSendMessageException; + //import kafka.javaapi.producer.Producer; //import kafka.producer.KeyedMessage; //import kafka.producer.ProducerConfig; @@ -76,25 +76,23 @@ public class KafkaPublisher implements Publisher { kafkaConnUrl="localhost:9092"; } - //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf"; + // props.put("bootstrap.servers", bootSever); //System.setProperty("java.security.auth.login.config",jaaspath); - /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); - transferSetting( props, "sasl.mechanism", "PLAIN");*/ + transferSetting( props, "bootstrap.servers",kafkaConnUrl); - //transferSetting( props, "metadata.broker.list", kafkaConnUrl); + transferSetting( props, "request.required.acks", "1"); transferSetting( props, "message.send.max.retries", "5"); transferSetting(props, "retry.backoff.ms", "150"); - //props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - //fConfig = new ProducerConfig(props); + //fProducer = new Producer<String, String>(fConfig); fProducer = new KafkaProducer<>(props); } @@ -184,7 +182,7 @@ try{ for (message o : msgs) { final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString()); - //kms.add(data); + try { @@ -200,7 +198,7 @@ try{ } //private final rrNvReadable fSettings; - //private ProducerConfig fConfig; + private Producer<String, String> fProducer; /** @@ -227,9 +225,5 @@ try{ } - //@Override - //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { - // TODO Auto-generated method stub - - //} + }
\ No newline at end of file |