From d6be49830611313368e29ddaab8d0c314a3767c1 Mon Sep 17 00:00:00 2001 From: Donald Hunter Date: Mon, 18 Mar 2019 11:57:39 +0000 Subject: Uprev spark/kafka to resolve vulnerabilities Change-Id: Ibc18238a7fe7886c6da6b6d295de0d32d4620aa7 Issue-ID: DCAEGEN2-1207 Signed-off-by: Donald Hunter --- .../src/main/scala/com/cisco/ztt/KafkaInput.scala | 28 +++++++++++++++++----- .../cisco/xr/telemetry/TelemetryDispatcher.scala | 2 +- .../sparkStreaming/PndaZTTApp/properties.json | 3 ++- 3 files changed, 25 insertions(+), 8 deletions(-) (limited to 'pnda-ztt-app/src') diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala index c0bc61e..838b155 100644 --- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala @@ -40,7 +40,7 @@ express or implied. package com.cisco.ztt import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.kafka010.KafkaUtils import com.cisco.pnda.model.DataPlatformEventCodec import com.cisco.pnda.model.StaticHelpers @@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers import kafka.serializer.DefaultDecoder import kafka.serializer.StringDecoder import org.apache.log4j.Logger +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +import org.apache.spark.streaming.kafka010.PreferConsistent +import java.util.Arrays +import scala.collection.JavaConversions +import java.util.Collections +import org.springframework.core.serializer.DefaultDeserializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.ByteArrayDeserializer class KafkaInput extends Serializable { @@ -57,22 +66,29 @@ class KafkaInput extends Serializable { def readFromKafka(ssc: StreamingContext, topic: String) = { val props = AppConfig.loadProperties(); - val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + val kafkaParams = collection.mutable.Map[String, Object]( + "bootstrap.servers" -> props.getProperty("kafka.brokers"), + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[ByteArrayDeserializer], + "group.id" -> "pnda-ztt-app" + ) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { kafkaParams.put("auto.offset.reset", "smallest"); } - Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list")) + Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers")) Holder.logger.info("Registering with kafka using topic " + topic) - val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( - ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( + ssc, PreferConsistent, + Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams))) + // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))) // Decode avro container format val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { val eventDecoder = new DataPlatformEventCodec(avroSchemaString); - val payload = x._2; + val payload = x.value; val dataPlatformEvent = eventDecoder.decode(payload); dataPlatformEvent; }); diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala index b7deadc..4fc57f8 100644 --- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala @@ -62,7 +62,7 @@ class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer { } } catch { case t: Throwable => { - Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage) + Holder.logger.error("Failed to parse JSON: " + t.getClass().getName() + " : " + t.getLocalizedMessage) (false, Set[Payload]()) } } diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json index 2f8ab6a..2301cb4 100644 --- a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json @@ -6,5 +6,6 @@ "processing_parallelism" : "1", "checkpoint_path": "", "input_topic": "ves.avro", - "consume_from_beginning": "false" + "consume_from_beginning": "false", + "spark_version": "2" } -- cgit 1.2.3-korg