diff options
Diffstat (limited to 'pnda-ztt-app')
-rw-r--r-- | pnda-ztt-app/Makefile | 9 | ||||
-rw-r--r-- | pnda-ztt-app/build.sbt | 24 | ||||
-rw-r--r-- | pnda-ztt-app/pom.xml | 2 | ||||
-rw-r--r-- | pnda-ztt-app/project/plugins.sbt | 1 | ||||
-rw-r--r-- | pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala | 28 | ||||
-rw-r--r-- | pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala | 2 | ||||
-rw-r--r-- | pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json | 3 | ||||
-rw-r--r-- | pnda-ztt-app/test/log4j.testing.properties | 4 |
8 files changed, 50 insertions, 23 deletions
diff --git a/pnda-ztt-app/Makefile b/pnda-ztt-app/Makefile index 7c08bde..69226cc 100644 --- a/pnda-ztt-app/Makefile +++ b/pnda-ztt-app/Makefile @@ -1,6 +1,7 @@ -SERVER=https://knox.example.com:8443/gateway/pnda/deployment +SERVER=https://knox.example.com:8443/gateway/pnda/repository APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar -PACKAGE=pnda-ztt-app-0.0.3.tar.gz +PACKAGE=pnda-ztt-app-0.0.4.tar.gz +DEBUG=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" app: ## Build the application jar sbt assembly @@ -9,13 +10,13 @@ package: ## Build the deployable package sbt 'universal:packageZipTarball' deploy: ## Deploy the package to PNDA cluster - curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null + curl -k -u pnda:pnda -X PUT $(SERVER)/packages/ --upload-file target/universal/$(PACKAGE) > /dev/null list: ## Show the deployed packages curl $(SERVER)/packages delete: ## Delete the deployed package - curl -XDELETE $(SERVER)/packages/$(PACKAGE) + curl -k -u pnda:pnda -XDELETE $(SERVER)/packages/$(PACKAGE) test/PndaZTTApp.jar: $(APP) test/application.properties cp $< $@ diff --git a/pnda-ztt-app/build.sbt b/pnda-ztt-app/build.sbt index 5811d22..24f6bdb 100644 --- a/pnda-ztt-app/build.sbt +++ b/pnda-ztt-app/build.sbt @@ -1,8 +1,8 @@ name := "pnda-ztt-app" -version := "0.0.3" +version := "0.0.4" -scalaVersion := "2.10.6" +scalaVersion := "2.11.8" enablePlugins(UniversalPlugin) @@ -15,20 +15,24 @@ packageZipTarball in Universal := { } libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "1.6.0" % "provided", - "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided", - "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" , - "org.apache.kafka" %% "kafka" % "0.8.2.1", + "org.apache.spark" %% "spark-core" % "2.3.0" % "provided", + "org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided", + "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.3" , + "org.apache.kafka" %% "kafka" % "0.11.0.3", "org.apache.avro" % "avro" % "1.7.7", "joda-time" % "joda-time" % "2.8.1" % "provided", "log4j" % "log4j" % "1.2.17" % "provided", "org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided", "org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided", - "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3", - "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided", + "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7" % "provided", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7" % "provided", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7", + "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1" % "provided", "org.springframework" % "spring-core" % "4.3.10.RELEASE", - "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test" + "org.scalatest" % "scalatest_2.11" % "3.0.5" % "test", + "org.scala-lang" % "scala-reflect" % "2.11.8", + "org.scala-lang" % "scala-compiler" % "2.11.8" % "provided", + "org.scalactic" % "scalactic_2.11" % "3.2.0-SNAP10" ) EclipseKeys.withSource := true diff --git a/pnda-ztt-app/pom.xml b/pnda-ztt-app/pom.xml index 68417cd..b5ba320 100644 --- a/pnda-ztt-app/pom.xml +++ b/pnda-ztt-app/pom.xml @@ -65,7 +65,7 @@ <configuration> <artifacts> <artifact> - <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file> + <file>${project.build.directory}/universal/${project.name}-0.0.4.tar.gz</file> <type>tar.gz</type> <classifier>app</classifier> </artifact> diff --git a/pnda-ztt-app/project/plugins.sbt b/pnda-ztt-app/project/plugins.sbt index b03dd6c..8db1a6c 100644 --- a/pnda-ztt-app/project/plugins.sbt +++ b/pnda-ztt-app/project/plugins.sbt @@ -1,3 +1,4 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2") diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala index c0bc61e..838b155 100644 --- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala @@ -40,7 +40,7 @@ express or implied. package com.cisco.ztt import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.kafka010.KafkaUtils import com.cisco.pnda.model.DataPlatformEventCodec import com.cisco.pnda.model.StaticHelpers @@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers import kafka.serializer.DefaultDecoder import kafka.serializer.StringDecoder import org.apache.log4j.Logger +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +import org.apache.spark.streaming.kafka010.PreferConsistent +import java.util.Arrays +import scala.collection.JavaConversions +import java.util.Collections +import org.springframework.core.serializer.DefaultDeserializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.ByteArrayDeserializer class KafkaInput extends Serializable { @@ -57,22 +66,29 @@ class KafkaInput extends Serializable { def readFromKafka(ssc: StreamingContext, topic: String) = { val props = AppConfig.loadProperties(); - val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + val kafkaParams = collection.mutable.Map[String, Object]( + "bootstrap.servers" -> props.getProperty("kafka.brokers"), + "key.deserializer" -> classOf[StringDeserializer], + "value.deserializer" -> classOf[ByteArrayDeserializer], + "group.id" -> "pnda-ztt-app" + ) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { kafkaParams.put("auto.offset.reset", "smallest"); } - Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list")) + Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers")) Holder.logger.info("Registering with kafka using topic " + topic) - val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( - ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( + ssc, PreferConsistent, + Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams))) + // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))) // Decode avro container format val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { val eventDecoder = new DataPlatformEventCodec(avroSchemaString); - val payload = x._2; + val payload = x.value; val dataPlatformEvent = eventDecoder.decode(payload); dataPlatformEvent; }); diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala index b7deadc..4fc57f8 100644 --- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala @@ -62,7 +62,7 @@ class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer { } } catch { case t: Throwable => { - Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage) + Holder.logger.error("Failed to parse JSON: " + t.getClass().getName() + " : " + t.getLocalizedMessage) (false, Set[Payload]()) } } diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json index 2f8ab6a..2301cb4 100644 --- a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json @@ -6,5 +6,6 @@ "processing_parallelism" : "1", "checkpoint_path": "", "input_topic": "ves.avro", - "consume_from_beginning": "false" + "consume_from_beginning": "false", + "spark_version": "2" } diff --git a/pnda-ztt-app/test/log4j.testing.properties b/pnda-ztt-app/test/log4j.testing.properties index 1388b20..de05e30 100644 --- a/pnda-ztt-app/test/log4j.testing.properties +++ b/pnda-ztt-app/test/log4j.testing.properties @@ -4,6 +4,10 @@ log4j.logger.com.cisco.pnda=TRACE,console log4j.additivity.com.cisco.pnda=false log4j.logger.com.cisco.ztt=TRACE,console log4j.additivity.com.cisco.ztt=false +log4j.logger.org.spark_project=INFO,console +log4j.additivity.org.spark_project=false +log4j.logger.org.apache=WARN,console +log4j.additivity.org.apache=false log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out |