summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
diff options
context:
space:
mode:
Diffstat (limited to 'pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala')
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala28
1 files changed, 22 insertions, 6 deletions
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
index c0bc61e..838b155 100644
--- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
@@ -40,7 +40,7 @@ express or implied.
package com.cisco.ztt
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.kafka010.KafkaUtils
import com.cisco.pnda.model.DataPlatformEventCodec
import com.cisco.pnda.model.StaticHelpers
@@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers
import kafka.serializer.DefaultDecoder
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.PreferConsistent
+import java.util.Arrays
+import scala.collection.JavaConversions
+import java.util.Collections
+import org.springframework.core.serializer.DefaultDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
class KafkaInput extends Serializable {
@@ -57,22 +66,29 @@ class KafkaInput extends Serializable {
def readFromKafka(ssc: StreamingContext, topic: String) = {
val props = AppConfig.loadProperties();
- val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ val kafkaParams = collection.mutable.Map[String, Object](
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.deserializer" -> classOf[StringDeserializer],
+ "value.deserializer" -> classOf[ByteArrayDeserializer],
+ "group.id" -> "pnda-ztt-app"
+ )
if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
kafkaParams.put("auto.offset.reset", "smallest");
}
- Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers"))
Holder.logger.info("Registering with kafka using topic " + topic)
- val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
- ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
+ ssc, PreferConsistent,
+ Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams)))
+ // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")))
// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
- val payload = x._2;
+ val payload = x.value;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
});