summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app
diff options
context:
space:
mode:
Diffstat (limited to 'pnda-ztt-app')
-rw-r--r--pnda-ztt-app/Makefile9
-rw-r--r--pnda-ztt-app/build.sbt24
-rw-r--r--pnda-ztt-app/pom.xml2
-rw-r--r--pnda-ztt-app/project/plugins.sbt1
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala28
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala2
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json3
-rw-r--r--pnda-ztt-app/test/log4j.testing.properties4
8 files changed, 50 insertions, 23 deletions
diff --git a/pnda-ztt-app/Makefile b/pnda-ztt-app/Makefile
index 7c08bde..69226cc 100644
--- a/pnda-ztt-app/Makefile
+++ b/pnda-ztt-app/Makefile
@@ -1,6 +1,7 @@
-SERVER=https://knox.example.com:8443/gateway/pnda/deployment
+SERVER=https://knox.example.com:8443/gateway/pnda/repository
APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar
-PACKAGE=pnda-ztt-app-0.0.3.tar.gz
+PACKAGE=pnda-ztt-app-0.0.4.tar.gz
+DEBUG=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
app: ## Build the application jar
sbt assembly
@@ -9,13 +10,13 @@ package: ## Build the deployable package
sbt 'universal:packageZipTarball'
deploy: ## Deploy the package to PNDA cluster
- curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null
+ curl -k -u pnda:pnda -X PUT $(SERVER)/packages/ --upload-file target/universal/$(PACKAGE) > /dev/null
list: ## Show the deployed packages
curl $(SERVER)/packages
delete: ## Delete the deployed package
- curl -XDELETE $(SERVER)/packages/$(PACKAGE)
+ curl -k -u pnda:pnda -XDELETE $(SERVER)/packages/$(PACKAGE)
test/PndaZTTApp.jar: $(APP) test/application.properties
cp $< $@
diff --git a/pnda-ztt-app/build.sbt b/pnda-ztt-app/build.sbt
index 5811d22..24f6bdb 100644
--- a/pnda-ztt-app/build.sbt
+++ b/pnda-ztt-app/build.sbt
@@ -1,8 +1,8 @@
name := "pnda-ztt-app"
-version := "0.0.3"
+version := "0.0.4"
-scalaVersion := "2.10.6"
+scalaVersion := "2.11.8"
enablePlugins(UniversalPlugin)
@@ -15,20 +15,24 @@ packageZipTarball in Universal := {
}
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
- "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided",
- "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" ,
- "org.apache.kafka" %% "kafka" % "0.8.2.1",
+ "org.apache.spark" %% "spark-core" % "2.3.0" % "provided",
+ "org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
+ "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.3" ,
+ "org.apache.kafka" %% "kafka" % "0.11.0.3",
"org.apache.avro" % "avro" % "1.7.7",
"joda-time" % "joda-time" % "2.8.1" % "provided",
"log4j" % "log4j" % "1.2.17" % "provided",
"org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided",
"org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided",
- "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided",
- "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3",
- "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided",
+ "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7" % "provided",
+ "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7" % "provided",
+ "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7",
+ "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1" % "provided",
"org.springframework" % "spring-core" % "4.3.10.RELEASE",
- "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test"
+ "org.scalatest" % "scalatest_2.11" % "3.0.5" % "test",
+ "org.scala-lang" % "scala-reflect" % "2.11.8",
+ "org.scala-lang" % "scala-compiler" % "2.11.8" % "provided",
+ "org.scalactic" % "scalactic_2.11" % "3.2.0-SNAP10"
)
EclipseKeys.withSource := true
diff --git a/pnda-ztt-app/pom.xml b/pnda-ztt-app/pom.xml
index 68417cd..b5ba320 100644
--- a/pnda-ztt-app/pom.xml
+++ b/pnda-ztt-app/pom.xml
@@ -65,7 +65,7 @@
<configuration>
<artifacts>
<artifact>
- <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file>
+ <file>${project.build.directory}/universal/${project.name}-0.0.4.tar.gz</file>
<type>tar.gz</type>
<classifier>app</classifier>
</artifact>
diff --git a/pnda-ztt-app/project/plugins.sbt b/pnda-ztt-app/project/plugins.sbt
index b03dd6c..8db1a6c 100644
--- a/pnda-ztt-app/project/plugins.sbt
+++ b/pnda-ztt-app/project/plugins.sbt
@@ -1,3 +1,4 @@
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1")
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
index c0bc61e..838b155 100644
--- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
@@ -40,7 +40,7 @@ express or implied.
package com.cisco.ztt
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.kafka010.KafkaUtils
import com.cisco.pnda.model.DataPlatformEventCodec
import com.cisco.pnda.model.StaticHelpers
@@ -48,6 +48,15 @@ import com.cisco.pnda.model.StaticHelpers
import kafka.serializer.DefaultDecoder
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.PreferConsistent
+import java.util.Arrays
+import scala.collection.JavaConversions
+import java.util.Collections
+import org.springframework.core.serializer.DefaultDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
class KafkaInput extends Serializable {
@@ -57,22 +66,29 @@ class KafkaInput extends Serializable {
def readFromKafka(ssc: StreamingContext, topic: String) = {
val props = AppConfig.loadProperties();
- val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ val kafkaParams = collection.mutable.Map[String, Object](
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.deserializer" -> classOf[StringDeserializer],
+ "value.deserializer" -> classOf[ByteArrayDeserializer],
+ "group.id" -> "pnda-ztt-app"
+ )
if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
kafkaParams.put("auto.offset.reset", "smallest");
}
- Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using bootstrap servers " + kafkaParams("bootstrap.servers"))
Holder.logger.info("Registering with kafka using topic " + topic)
- val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
- ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
+ ssc, PreferConsistent,
+ Subscribe[String, Array[Byte]](Arrays.asList(topic), JavaConversions.mapAsJavaMap(kafkaParams)))
+ // .repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")))
// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
- val payload = x._2;
+ val payload = x.value;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
});
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
index b7deadc..4fc57f8 100644
--- a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
@@ -62,7 +62,7 @@ class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
}
} catch {
case t: Throwable => {
- Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+ Holder.logger.error("Failed to parse JSON: " + t.getClass().getName() + " : " + t.getLocalizedMessage)
(false, Set[Payload]())
}
}
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
index 2f8ab6a..2301cb4 100644
--- a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
@@ -6,5 +6,6 @@
"processing_parallelism" : "1",
"checkpoint_path": "",
"input_topic": "ves.avro",
- "consume_from_beginning": "false"
+ "consume_from_beginning": "false",
+ "spark_version": "2"
}
diff --git a/pnda-ztt-app/test/log4j.testing.properties b/pnda-ztt-app/test/log4j.testing.properties
index 1388b20..de05e30 100644
--- a/pnda-ztt-app/test/log4j.testing.properties
+++ b/pnda-ztt-app/test/log4j.testing.properties
@@ -4,6 +4,10 @@ log4j.logger.com.cisco.pnda=TRACE,console
log4j.additivity.com.cisco.pnda=false
log4j.logger.com.cisco.ztt=TRACE,console
log4j.additivity.com.cisco.ztt=false
+log4j.logger.org.spark_project=INFO,console
+log4j.additivity.org.spark_project=false
+log4j.logger.org.apache=WARN,console
+log4j.additivity.org.apache=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out