diff options
Diffstat (limited to 'pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala')
-rw-r--r-- | pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala index c0bc61e..838b155 100644 --- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala @@ -40,7 +40,7 @@ express or implied. package com.cisco.ztt import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.kafka010.KafkaUtils import com.cisco.pnda.model.DataPlatformEventCodec import com.cisco.pnda.model.StaticHelpers @@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers import kafka.serializer.DefaultDecoder import kafka.serializer.StringDecoder import org.apache.log4j.Logger +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +import org.apache.spark.streaming.kafka010.PreferConsistent +import java.util.Arrays +import scala.collection.JavaConversions +import java.util.Collections +import org.springframework.core.serializer.DefaultDeserializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.ByteArrayDeserializer class KafkaInput extends Serializable { @@ -57,22 +66,29 @@ class KafkaInput extends Serializable { def readFromKafka(ssc: StreamingContext, topic: String) = { val props = AppConfig.loadProperties(); - val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + val kafkaParams = collection.mutable.Map[String, Object]( + "bootstrap.servers" -> props.getProperty("kafka.brokers"), + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[ByteArrayDeserializer], + "group.id" -> "pnda-ztt-app" + ) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { kafkaParams.put("auto.offset.reset", "smallest"); } - Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list")) + Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers")) Holder.logger.info("Registering with kafka using topic " + topic) - val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( - ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( + ssc, PreferConsistent, + Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams))) + // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))) // Decode avro container format val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { val eventDecoder = new DataPlatformEventCodec(avroSchemaString); - val payload = x._2; + val payload = x.value; val dataPlatformEvent = eventDecoder.decode(payload); dataPlatformEvent; }); |