summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchenxdu <chenxdu@cisco.com>2018-09-19 16:42:15 +0200
committerDonald Hunter <donaldh@cisco.com>2018-10-01 10:23:50 +0100
commitfd1060d4c176272f312fb23495ff8cdbebc121ae (patch)
tree2f24090ec71e47e69bd392918198745d0c8406e8
parenta789d153737a991c14c7be03ae9044563573e4d2 (diff)
PNDA Telemetry app for virtual firwall use case
The telemetry app ingests virtual firewall VES events into HDFS and the timeseries datastore to support futher analytics. Change-Id: I3a0920d4b416c1c165311ab9ff0fc31d8c96499f Signed-off-by: chenxdu <chenxdu@cisco.com> Issue-ID: DCAEGEN2-632 Signed-off-by: Donald Hunter <donaldh@cisco.com>
-rw-r--r--.gitignore2
-rw-r--r--pnda-ztt-app/.gitignore8
-rw-r--r--pnda-ztt-app/Makefile40
-rw-r--r--pnda-ztt-app/README.md13
-rw-r--r--pnda-ztt-app/assembly.sbt27
-rw-r--r--pnda-ztt-app/build.sbt34
-rw-r--r--pnda-ztt-app/pom.xml72
-rw-r--r--pnda-ztt-app/project/assembly.sbt1
-rw-r--r--pnda-ztt-app/project/build.properties3
-rw-r--r--pnda-ztt-app/project/plugins.sbt3
-rw-r--r--pnda-ztt-app/src/main/resources/dataplatform-raw.avsc10
-rw-r--r--pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml17
-rw-r--r--pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml19
-rw-r--r--pnda-ztt-app/src/main/resources/meta/fib-summary.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml36
-rw-r--r--pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml16
-rw-r--r--pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml58
-rw-r--r--pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml23
-rw-r--r--pnda-ztt-app/src/main/resources/meta/logging-stats.yaml11
-rw-r--r--pnda-ztt-app/src/main/resources/meta/memory-detail.yaml31
-rw-r--r--pnda-ztt-app/src/main/resources/meta/memory-summary.yaml25
-rw-r--r--pnda-ztt-app/src/main/resources/meta/rib-oper.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml12
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ves-nic.yaml12
-rw-r--r--pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml27
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala93
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java54
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala81
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala110
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala25
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala35
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala99
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala62
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala22
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala76
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala26
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala74
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala48
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala104
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala50
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala71
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala103
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala52
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala52
-rw-r--r--pnda-ztt-app/src/test/resources/application.properties8
-rw-r--r--pnda-ztt-app/src/test/resources/log4j.testing.properties14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-one.yaml14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-three.yaml14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-two.yaml14
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala122
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala95
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala43
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala61
-rw-r--r--pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala106
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore1
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties8
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties13
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json8
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json10
-rw-r--r--pnda-ztt-app/test/application.properties8
-rw-r--r--pnda-ztt-app/test/log4j.testing.properties14
-rw-r--r--pom.xml41
74 files changed, 3029 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..42f4a1a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+target/
+*~
diff --git a/pnda-ztt-app/.gitignore b/pnda-ztt-app/.gitignore
new file mode 100644
index 0000000..93ed9eb
--- /dev/null
+++ b/pnda-ztt-app/.gitignore
@@ -0,0 +1,8 @@
+target/
+.project
+.classpath
+.settings/
+/bin/
+.cache-main
+.cache-tests
+PndaZTTApp.jar
diff --git a/pnda-ztt-app/Makefile b/pnda-ztt-app/Makefile
new file mode 100644
index 0000000..7c08bde
--- /dev/null
+++ b/pnda-ztt-app/Makefile
@@ -0,0 +1,40 @@
+SERVER=https://knox.example.com:8443/gateway/pnda/deployment
+APP=src/universal/sparkStreaming/PndaZTTApp/PndaZTTApp.jar
+PACKAGE=pnda-ztt-app-0.0.3.tar.gz
+
+app: ## Build the application jar
+ sbt assembly
+
+package: ## Build the deployable package
+ sbt 'universal:packageZipTarball'
+
+deploy: ## Deploy the package to PNDA cluster
+ curl -k -u pnda:pnda -X PUT $(SERVER)/packages/$(PACKAGE) --data-binary @target/universal/$(PACKAGE) > /dev/null
+
+list: ## Show the deployed packages
+ curl $(SERVER)/packages
+
+delete: ## Delete the deployed package
+ curl -XDELETE $(SERVER)/packages/$(PACKAGE)
+
+test/PndaZTTApp.jar: $(APP) test/application.properties
+ cp $< $@
+ jar uvf $@ -C test application.properties
+
+test: test/PndaZTTApp.jar ## Run the assembly with spark-submit
+ spark-submit \
+ --driver-java-options "-Dlog4j.configuration=file://$(PWD)/test/log4j.testing.properties" \
+ --class com.cisco.ztt.App \
+ --master 'local[4]' --deploy-mode client \
+ test/PndaZTTApp.jar
+
+clean: ## Run sbt clean
+ sbt clean
+ rm -f $(APP)
+ rm -f test/PndaZTTApp.jar
+
+help: ## This help
+ @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
+
+.PHONY: help test
+.DEFAULT_GOAL := help
diff --git a/pnda-ztt-app/README.md b/pnda-ztt-app/README.md
new file mode 100644
index 0000000..9ab1d53
--- /dev/null
+++ b/pnda-ztt-app/README.md
@@ -0,0 +1,13 @@
+# PNDA Zero Touch Telemetry
+
+## Overview
+
+The Zero Touch Telemetry application is a configurable telemetry-to-OpenTSDB solution. Metadata
+files are used to configure the ZTT application for each telemetry source.
+
+The application receives telemetry events from Kafka, transforms the contents into a set of
+timeseries datapoints and writes them to OpenTSDB.
+
+This application demonstrates ingest of VES events from a virtual firewall into PNDA. The
+extracted metrics get stored in HDFS and the timeseries database. This enables direct
+visualization via Grafana as well as downstream Spark based analytics.
diff --git a/pnda-ztt-app/assembly.sbt b/pnda-ztt-app/assembly.sbt
new file mode 100644
index 0000000..7c9c71f
--- /dev/null
+++ b/pnda-ztt-app/assembly.sbt
@@ -0,0 +1,27 @@
+import AssemblyKeys._
+
+assemblySettings
+
+jarName in assembly := "PndaZTTApp.jar"
+
+target in assembly:= file("src/universal/sparkStreaming/PndaZTTApp")
+
+mergeStrategy in assembly := {
+ case PathList("META-INF", "jboss-beans.xml") => MergeStrategy.first
+ case PathList("META-INF", "mailcap") => MergeStrategy.discard
+ case PathList("META-INF", "maven", "org.slf4j", "slf4j-api", xa @ _*) => MergeStrategy.rename
+ case PathList("META-INF", "ECLIPSEF.RSA") => MergeStrategy.discard
+ case PathList("META-INF", "mimetypes.default") => MergeStrategy.first
+ case PathList("com", "datastax", "driver", "core", "Driver.properties") => MergeStrategy.last
+ case PathList("com", "esotericsoftware", "minlog", xx @ _*) => MergeStrategy.first
+ case PathList("plugin.properties") => MergeStrategy.discard
+ case PathList("javax", "activation", xw @ _*) => MergeStrategy.first
+ case PathList("org", "apache", "hadoop", "yarn", xv @ _*) => MergeStrategy.first
+ case PathList("org", "apache", "commons", xz @ _*) => MergeStrategy.first
+ case PathList("org", "jboss", "netty", ya @ _*) => MergeStrategy.first
+ case PathList("org", "apache", "spark", ya @ _*) => MergeStrategy.first
+ case x => {
+ val oldStrategy = (mergeStrategy in assembly).value
+ oldStrategy(x)
+ }
+}
diff --git a/pnda-ztt-app/build.sbt b/pnda-ztt-app/build.sbt
new file mode 100644
index 0000000..5811d22
--- /dev/null
+++ b/pnda-ztt-app/build.sbt
@@ -0,0 +1,34 @@
+name := "pnda-ztt-app"
+
+version := "0.0.3"
+
+scalaVersion := "2.10.6"
+
+enablePlugins(UniversalPlugin)
+
+packageZipTarball in Universal := {
+ val originalFileName = (packageZipTarball in Universal).value
+ val (base, ext) = originalFileName.baseAndExt
+ val newFileName = file(originalFileName.getParent) / (base + ".tar.gz")
+ IO.move(originalFileName, newFileName)
+ newFileName
+}
+
+libraryDependencies ++= Seq(
+ "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
+ "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided",
+ "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" ,
+ "org.apache.kafka" %% "kafka" % "0.8.2.1",
+ "org.apache.avro" % "avro" % "1.7.7",
+ "joda-time" % "joda-time" % "2.8.1" % "provided",
+ "log4j" % "log4j" % "1.2.17" % "provided",
+ "org.apache.httpcomponents" % "httpcore" % "4.2.5" % "provided",
+ "org.apache.httpcomponents" % "httpclient" % "4.2.5" % "provided",
+ "com.fasterxml.jackson.core" % "jackson-databind" % "2.2.3" % "provided",
+ "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.2.3",
+ "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.2.3" % "provided",
+ "org.springframework" % "spring-core" % "4.3.10.RELEASE",
+ "org.scalatest" % "scalatest_2.10" % "3.0.1" % "test"
+)
+
+EclipseKeys.withSource := true
diff --git a/pnda-ztt-app/pom.xml b/pnda-ztt-app/pom.xml
new file mode 100644
index 0000000..c792a88
--- /dev/null
+++ b/pnda-ztt-app/pom.xml
@@ -0,0 +1,72 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.onap.dcaegen2.analytics.pnda</groupId>
+ <artifactId>pnda-ztt-app</artifactId>
+ <packaging>pom</packaging>
+ <description>pnda-ztt-app</description>
+ <version>1.0.0-SNAPSHOT</version>
+ <name>pnda-ztt-app</name>
+ <organization>
+ <name>default</name>
+ </organization>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-sbt</groupId>
+ <artifactId>sbt-launch</artifactId>
+ <version>1.0.0-M4</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.6.0</version>
+ <executions>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>java</executable>
+ <classpathScope>compile</classpathScope>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath/>
+ <argument>xsbt.boot.Boot</argument>
+ <argument>assembly</argument>
+ <argument>universal:packageZipTarball</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/universal/${project.name}-0.0.3.tar.gz</file>
+ <type>tar.gz</type>
+ <classifier>app</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pnda-ztt-app/project/assembly.sbt b/pnda-ztt-app/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/pnda-ztt-app/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/pnda-ztt-app/project/build.properties b/pnda-ztt-app/project/build.properties
new file mode 100644
index 0000000..62488aa
--- /dev/null
+++ b/pnda-ztt-app/project/build.properties
@@ -0,0 +1,3 @@
+sbt.version=0.13.16
+
+resolvers += "Artima Maven Repository" at "https://repo.artima.com/releases"
diff --git a/pnda-ztt-app/project/plugins.sbt b/pnda-ztt-app/project/plugins.sbt
new file mode 100644
index 0000000..b03dd6c
--- /dev/null
+++ b/pnda-ztt-app/project/plugins.sbt
@@ -0,0 +1,3 @@
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.1")
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
+addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.1.1")
diff --git a/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc
new file mode 100644
index 0000000..5450771
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc
@@ -0,0 +1,10 @@
+{"namespace": "com.cisco.pnda",
+ "type": "record",
+ "name": "PndaRecord",
+ "fields": [
+ {"name": "timestamp", "type": "long"},
+ {"name": "src", "type": "string"},
+ {"name": "host_ip", "type": "string"},
+ {"name": "rawdata", "type": "bytes"}
+ ]
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml
new file mode 100644
index 0000000..5d56f77
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml
@@ -0,0 +1,17 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: bgp
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv4-bgp-oper:bgp/instances/instance/instance-active/default-vrf/afs/af/neighbor-af-table/neighbor
+ keys:
+ - name: af-name
+ display_name: "Address Family Name"
+ - name: instance-name
+ display_name: "Instance Name"
+ - name: neighbor-address
+ display_name: "Neighbor Address"
+ content:
+ - name: connection-up-count
+ display_name: "Connection Up Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml
new file mode 100644
index 0000000..0ce8870
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml
@@ -0,0 +1,19 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: total-cpu-one-minute
+ display_name: "One-minute CPU Total"
+
+ - name: total-cpu-five-minute
+ display_name: "Five-minute CPU Total"
+
+ - name: total-cpu-fifteen-minute
+ display_name: "Fifteen-minute CPU Total"
diff --git a/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml
new file mode 100644
index 0000000..2dbb4ae
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/fib-summaries/fib-summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ - name: protocol-name
+ display_name: "Protocol Name"
+ - name: vrf-name
+ display_name: "VRF Name"
+ content:
+ - name: extended-prefixes
+ display_name: "Num Extended Prefixes"
+
+ - name: forwarding-elements
+ display_name: "Num Forwarding Elements"
+
+ - name: next-hops
+ display_name: "Num Next Hops"
+
+ - name: routes
+ display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml
new file mode 100644
index 0000000..24fb808
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml
@@ -0,0 +1,36 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: interface
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+ ts_name: bytes-in # rename the metric in OpenTSDB
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
+ ts_name: bytes-out # rename the metric in OpenTSDB
+
+ - name: packets-received
+ display_name: "Packets Received"
+
+ - name: packets-sent
+ display_name: "Packets Sent"
+
+ - name: broadcast-packets-received
+ display_name: "Broadcast Packets Received"
+
+ - name: broadcast-packets-sent
+ display_name: "Broadcast Packets Sent"
+
+ - name: multicast-packets-received
+ display_name: "Multicast Packets Received"
+
+ - name: multicast-packets-sent
+ display_name: "Multicast Packets Sent"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml
new file mode 100644
index 0000000..3bc9689
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-invmgr-oper:inventory/entities/entity/attributes/inv-basic-bag
+ keys:
+ - name: name
+ display_name: "Entity Name"
+ content:
+ - name: serial-number
+ display_name: "Serial Number"
+
+ - name: description
+ display_name: "Description"
+
+ - name: manufacturer-name
+ display_name: "Manufacturer"
+
+ - name: model-name
+ display_name: "Model Name"
+
+ - name: software-revision
+ display_name: "Software Revision"
+
+ - name: vendor-type
+ display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml
new file mode 100644
index 0000000..db6386d
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-invmgr-oper:inventory/racks/rack/attributes/inv-basic-bag
+ keys:
+ - name: name
+ display_name: "Entity Name"
+ content:
+ - name: serial-number
+ display_name: "Serial Number"
+
+ - name: description
+ display_name: "Description"
+
+ - name: manufacturer-name
+ display_name: "Manufacturer"
+
+ - name: model-name
+ display_name: "Model Name"
+
+ - name: software-revision
+ display_name: "Software Revision"
+
+ - name: vendor-type
+ display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml
new file mode 100644
index 0000000..3a5dcba
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml
@@ -0,0 +1,16 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: ipv6.total-packets
+ display_name: "Total IPV6 Packets"
+
+ - name: icmp.total-messages
+ display_name: "Total ICMP Messages"
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml
new file mode 100644
index 0000000..2360de6
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml
@@ -0,0 +1,58 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/summaries/summary
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: chassis-id
+ display_name: "Chassis Id"
+
+ - name: device-id
+ display_name: "Device Id"
+
+ - name: port-id-detail
+ display_name: "Port Id"
+
+ - name: receiving-interface-name
+ display_name: "Receiving Interface Name"
+
+ - name: enabled-capabilities
+ display_name: "Enabled Capabilities"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: lldp-neighbor.chassis-id
+ display_name: "Chassis Id"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/details/detail
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: lldp-neighbor.chassis-id
+ display_name: "Chassis Id"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/interfaces/interface
+ keys:
+ - name: interface-name
+ - name: node-name
+ content:
+ - name: interface-name
+ - name: if-index
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml
new file mode 100644
index 0000000..e29e32c
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml
@@ -0,0 +1,23 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: lldp.stats
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/statistics
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: received-packets
+ - name: transmitted-packets
+ - name: aged-out-entries
+ - name: bad-packets
+ - name: discarded-packets
+ - name: discarded-tl-vs
+ - name: encapsulation-errors
+ - name: out-of-memory-errors
+ - name: queue-overflow-errors
+ - name: table-overflow-errors
+ - name: unrecognized-tl-vs
diff --git a/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml
new file mode 100644
index 0000000..5d12152
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml
@@ -0,0 +1,11 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: logging
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics
+ content:
+ - name: buffer-logging-stats.message-count
+ ts_name: message-count
+ display_name: "Serial Number"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml
new file mode 100644
index 0000000..6b3f657
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml
@@ -0,0 +1,31 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/detail
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: allocated-memory
+ display_name: "Allocated Memory"
+
+ - name: free-application-memory
+ display_name: "Free Application Memory"
+
+ - name: free-physical-memory
+ display_name: "Free Physical Memory"
+
+ - name: ram-memory
+ display_name: "RAM Memory"
+
+ - name: program-data
+ display_name: "Program Data"
+
+ - name: program-stack
+ display_name: "Program Stack"
+
+ - name: program-text
+ display_name: "Program Text"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml
new file mode 100644
index 0000000..02adef6
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml
@@ -0,0 +1,25 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory.summary
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: allocated-memory
+ display_name: "Allocated Memory"
+
+ - name: free-application-memory
+ display_name: "Free Application Memory"
+
+ - name: free-physical-memory
+ display_name: "Free Physical Memory"
+
+ - name: ram-memory
+ display_name: "RAM Memory"
+
+ - name: system-ram-memory
+ display_name: "System RAM Memopry"
diff --git a/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml
new file mode 100644
index 0000000..197b0b9
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: rib
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ip-rib-ipv4-oper:rib/vrfs/vrf/afs/af/safs/saf/ip-rib-route-table-names/ip-rib-route-table-name/protocol/bgp/as/information
+ keys:
+ - name: af-name
+ display_name: "Address Family Name"
+ - name: as
+ display_name: "Address Family"
+ - name: route-table-name
+ display_name: "Route table name"
+ - name: saf-name
+ display_name: "Saf name"
+ - name: vrf-name
+ display_name: "Vrf name"
+ content:
+ - name: active-routes-count
+ display_name: "Active Routes Count"
+ - name: instance
+ display_name: "Instance"
+ - name: paths-count
+ display_name: "Paths Count"
+ - name: routes-counts
+ display_name: "Routes Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml
new file mode 100644
index 0000000..aaf1de8
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/cpuUsageArray
+ keys:
+ - name: cpuIdentifier
+ content:
+ - name: percentUsage
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml
new file mode 100644
index 0000000..a6ae3de
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/vNicUsageArray
+ keys:
+ - name: vNicIdentifier
+ content:
+ - name: receivedTotalPacketsDelta
diff --git a/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml
new file mode 100644
index 0000000..df466c3
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib.vrf
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/vrfs/vrf/summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ - name: protocol-name
+ display_name: "Protocol Name"
+ - name: vrf-name
+ display_name: "VRF Name"
+ content:
+ - name: extended-prefixes
+ display_name: "Num Extended Prefixes"
+
+ - name: forwarding-elements
+ display_name: "Num Forwarding Elements"
+
+ - name: next-hops
+ display_name: "Num Next Hops"
+
+ - name: routes
+ display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
new file mode 100644
index 0000000..b143b62
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: StatReporter
+ * Purpose: Report batch processing metrics to the PNDA metric logger
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.pnda
+
+import scala.util.control.NonFatal
+import java.io.StringWriter
+import java.io.PrintWriter
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
+import org.apache.log4j.Logger
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+
+class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
+ def doSend(metricName: String, metricValue: String) = {
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(metricsUrl)
+ post.setHeader("Content-type", "application/json")
+ val ts = java.lang.System.currentTimeMillis()
+ val body = f"""{
+ | "data": [{
+ | "source": "application.$appName",
+ | "metric": "application.kpi.$appName.$metricName",
+ | "value": "$metricValue",
+ | "timestamp": $ts%d
+ | }],
+ | "timestamp": $ts%d
+ |}""".stripMargin
+
+ logger.debug(body)
+ post.setEntity(new StringEntity(body))
+ val response = httpClient.execute(post)
+ if (response.getStatusLine.getStatusCode() != 200) {
+ logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
+ }
+
+ } catch {
+ case NonFatal(t) => {
+ logger.error("POST failed: " + metricsUrl)
+ val sw = new StringWriter
+ t.printStackTrace(new PrintWriter(sw))
+ logger.error(sw.toString)
+ }
+ }
+ }
+ doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
+ doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
+ doSend("num-records", batch.batchInfo.numRecords.toString())
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
new file mode 100644
index 0000000..1a08bd4
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEvent
+ * Purpose: Data model class for an avro event on Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class DataPlatformEvent implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+ protected static ObjectMapper _mapper = new ObjectMapper();
+
+ private String _src;
+ private Long _timestamp;
+ private String _hostIp;
+ private String _rawdata;
+
+ public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata)
+ {
+ _src = src;
+ _timestamp = timestamp;
+ _hostIp = host_ip;
+ _rawdata = rawdata;
+ }
+
+ public String getSrc()
+ {
+ return _src;
+ }
+
+ public Long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public String getHostIp()
+ {
+ return _hostIp;
+ }
+
+ public String getRawdata()
+ {
+ return _rawdata;
+ }
+
+ @Override
+ public String toString()
+ {
+ try
+ {
+ return _mapper.writeValueAsString(this);
+ }
+ catch (Exception ex)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ boolean result = false;
+ if (other instanceof DataPlatformEvent)
+ {
+ DataPlatformEvent that = (DataPlatformEvent) other;
+ result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc())))
+ && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp())))
+ && (this.getHostIp() == that.getHostIp() || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp())))
+ && (this.getRawdata() == that.getRawdata() || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata())));
+ }
+ return result;
+
+ }
+
+ public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException
+ {
+ return _mapper.readTree(_rawdata);
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
new file mode 100644
index 0000000..ef2bc0d
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEventDecoder
+ * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.log4j.Logger;
+
+
+public class DataPlatformEventCodec
+{
+ private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName());
+
+ private Schema.Parser _parser = new Schema.Parser();
+ private DatumReader<GenericRecord> _reader;
+ private DatumWriter<GenericRecord> _writer;
+ private Schema _schema;
+ private String _schemaDef;
+
+ public DataPlatformEventCodec(String schemaDef) throws IOException
+ {
+ _schemaDef = schemaDef;
+ _schema = _parser.parse(schemaDef);
+ _reader = new GenericDatumReader<GenericRecord>(_schema);
+ _writer = new GenericDatumWriter<GenericRecord>(_schema);
+ }
+
+ public DataPlatformEvent decode(byte[] data) throws IOException
+ {
+ try
+ {
+ Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+ GenericRecord r = _reader.read(null, decoder);
+ return new DataPlatformEvent((String) r.get("src").toString(),
+ (Long) r.get("timestamp"),
+ (String) r.get("host_ip").toString(),
+ new String(((ByteBuffer)r.get("rawdata")).array()));
+ }
+ catch(Exception ex)
+ {
+ LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex);
+ throw new IOException(ex);
+ }
+ }
+
+ final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
+ public static String hexStr(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for ( int j = 0; j < bytes.length; j++ ) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = hexArray[v >>> 4];
+ hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ public byte[] encode(DataPlatformEvent e) throws IOException
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ GenericRecord datum = new GenericData.Record(_schema);
+ datum.put("src", e.getSrc());
+ datum.put("timestamp", e.getTimestamp());
+ datum.put("host_ip", e.getHostIp());
+ datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8")));
+ _writer.write(datum, encoder);
+ encoder.flush();
+ out.close();
+ return out.toByteArray();
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
new file mode 100644
index 0000000..24a9d35
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: StaticHelpers
+ * Purpose: Helper functions
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class StaticHelpers {
+ public static String loadResourceFile(String path)
+ {
+ InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path);
+ try
+ {
+ char[] buf = new char[2048];
+ Reader r = new InputStreamReader(is, "UTF-8");
+ StringBuilder s = new StringBuilder();
+ while (true)
+ {
+ int n = r.read(buf);
+ if (n < 0)
+ break;
+ s.append(buf, 0, n);
+ }
+ return s.toString();
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
new file mode 100644
index 0000000..6bc2083
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import com.cisco.pnda.StatReporter
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.YamlReader
+import org.apache.log4j.BasicConfigurator
+
+object App {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ def main(args: Array[String]) {
+
+ BasicConfigurator.configure();
+
+ val props = AppConfig.loadProperties();
+ val loggerUrl = props.getProperty("environment.metric_logger_url")
+ val appName = props.getProperty("component.application")
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val metadata = YamlReader.load()
+ if (metadata.units.length == 0) {
+ logger.error("Trying to run app without metadata")
+ System.exit(1)
+ }
+ val pipeline = new ZttPipeline(metadata)
+
+ // Create the streaming context, or load a saved one from disk
+ val ssc = if (checkpointDirectory.length() > 0)
+ StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create();
+
+ sys.ShutdownHookThread {
+ logger.info("Gracefully stopping Spark Streaming Application")
+ ssc.stop(true, true)
+ logger.info("Application stopped")
+ }
+
+ if (loggerUrl != null) {
+ logger.info("Reporting stats to url: " + loggerUrl)
+ ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
+ }
+ logger.info("Starting spark streaming execution")
+ ssc.start()
+ ssc.awaitTermination()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
new file mode 100644
index 0000000..a711cab
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: AppConfig
+ * Purpose: Load application properties file.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.ztt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class AppConfig
+{
+ private static final Logger LOGGER = Logger.getLogger(AppConfig.class);
+
+ private static Properties _properties;
+ private static Object _lock = new Object();
+
+ public static Properties loadProperties()
+ {
+ synchronized (_lock)
+ {
+ if (_properties == null)
+ {
+ _properties = new Properties();
+ try
+ {
+ InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties");
+ _properties.load(is);
+ is.close();
+ LOGGER.info("Properties loaded");
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Failed to load properties", e);
+ System.exit(1);
+ }
+ }
+ return _properties;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
new file mode 100644
index 0000000..c0bc61e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaInput
+ * Purpose: Generate a dstream from Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.StringDecoder
+import org.apache.log4j.Logger
+
+class KafkaInput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def readFromKafka(ssc: StreamingContext, topic: String) = {
+ val props = AppConfig.loadProperties();
+ val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
+ kafkaParams.put("auto.offset.reset", "smallest");
+ }
+
+ Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using topic " + topic)
+
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
+ ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+
+ // Decode avro container format
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val rawMessages = messages.map(x => {
+ val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
+ val payload = x._2;
+ val dataPlatformEvent = eventDecoder.decode(payload);
+ dataPlatformEvent;
+ });
+ rawMessages;
+ };
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
new file mode 100644
index 0000000..1041d00
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+
+
+
+import java.io.StringWriter
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter
+
+
+class KafkaOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def writeToKafka(output: DStream[Payload]) = {
+
+ val props = AppConfig.loadProperties();
+ val producerConfig = Map(
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.serializer" -> classOf[StringSerializer].getName,
+ "value.serializer" -> classOf[ByteArraySerializer].getName
+)
+ output.writeToKafka(
+ producerConfig,
+ s => {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, s.datapoint)
+ val json = out.toString()
+
+ val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json)
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val codec = new DataPlatformEventCodec(avroSchemaString);
+
+ new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event))
+ }
+ )
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
new file mode 100644
index 0000000..9077986
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: OpenTSDBOutput
+ * Purpose: Write a dstream to OpenTSDB
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import java.io.StringWriter
+
+import scala.Iterator
+
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import scala.collection.mutable.ArrayBuffer
+
+class OpenTSDBOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def putOpentsdb[T](opentsdbIP: String,
+ input: DStream[Payload]): DStream[Payload] = {
+ input.mapPartitions(partition => {
+ var size = 0
+ val output = partition.grouped(20).flatMap(group => {
+ val data = ArrayBuffer[TimeseriesDatapoint]()
+ val passthru = group.map(item => {
+ data += item.datapoint
+ item
+ })
+
+ size += data.length
+
+ if (data.length > 0) {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, data)
+ val json = out.toString()
+
+ Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB")
+ Holder.logger.debug(json)
+
+ if (opentsdbIP != null && opentsdbIP.length() > 0) {
+ val openTSDBUrl = "http://" + opentsdbIP + "/api/put"
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(openTSDBUrl)
+ post.setHeader("Content-type", "application/json")
+ post.setEntity(new StringEntity(json))
+ val response = httpClient.execute(post)
+ // Holder.logger.debug(EntityUtils.toString(response.getEntity()))
+ } catch {
+ case t: Throwable => {
+ Holder.logger.warn(t)
+ }
+ }
+ }
+ } else {
+ Holder.logger.debug("No datapoints to post to OpenTSDB")
+ }
+ passthru
+ })
+ output
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
new file mode 100644
index 0000000..2961bd0
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class Payload(
+ val publishSrc: String,
+ val publishTopic: String,
+ val hostIp: String,
+ val timestamp: Long,
+ val datapoint: TimeseriesDatapoint) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
new file mode 100644
index 0000000..7e447bf
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class TimeseriesDatapoint(
+ val metric: String,
+ val value: String,
+ val timestamp: String,
+ val tags: Map[String, String]) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
new file mode 100644
index 0000000..aae1e8c
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+import com.cisco.ztt.meta.Metadata
+import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher
+import com.cisco.ztt.ves.telemetry.VesTransformer
+
+class TransformManager(metadata: Metadata) {
+
+ val transformers: Array[Transformer] = metadata.units.map( unit => {
+
+ unit.format match {
+ case "ves" => new VesTransformer(unit)
+ case "cisco.xr.telemetry" => new TelemetryDispatcher(unit)
+ case _ => new TelemetryDispatcher(unit)
+ }
+ })
+
+ val byTopic = transformers.groupBy( t => { t.inputTopic })
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
new file mode 100644
index 0000000..861eb5e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import com.cisco.pnda.model.DataPlatformEvent
+import org.apache.spark.streaming.dstream.DStream
+
+trait Transformer {
+ def inputTopic: String
+ def transform(event: DataPlatformEvent): (Boolean, Set[Payload])
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
new file mode 100644
index 0000000..c3b3532
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaPipeline
+ * Purpose: Set up the spark streaming processing graph.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import com.cisco.ztt.meta.Metadata
+import com.cisco.pnda.model.DataPlatformEvent
+
+class ZttPipeline(metadata: Metadata) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def create() = {
+ val props = AppConfig.loadProperties();
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val sparkConf = new SparkConf();
+ Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory)
+ val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds));
+
+ if (checkpointDirectory.length() > 0) {
+ ssc.checkpoint(checkpointDirectory)
+ }
+
+ val transformManager = new TransformManager(metadata)
+ val streams = transformManager.byTopic.map( x => {
+ val topic = x._1
+ val transformers = x._2
+
+ val inputStream = new KafkaInput().readFromKafka(ssc, topic)
+
+ val timeseriesStream = inputStream.flatMap(dataPlatformEvent => {
+ var handled = false;
+ val datapoints = transformers.flatMap(transformer => {
+ val (ran, data) = transformer.transform(dataPlatformEvent)
+ handled |= ran;
+ data;
+ })
+ if (!handled) {
+ Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata)
+ }
+ datapoints
+ })
+
+ val outputStream =
+ new OpenTSDBOutput().putOpentsdb(
+ props.getProperty("opentsdb.ip"),
+ timeseriesStream);
+
+ new KafkaOutput().writeToKafka(outputStream)
+ });
+
+ ssc;
+ }: StreamingContext
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
new file mode 100644
index 0000000..00470ff
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Telemetry
+
+class InventoryMapper(config: Telemetry) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("InventoryMapper is sinking " + config.path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
new file mode 100644
index 0000000..97f4da3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.JsonAST.JInt
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods
+import org.json4s.jvalue2extractable
+import org.json4s.string2JsonInput
+import scala.reflect.ManifestFactory.arrayType
+import scala.reflect.ManifestFactory.classType
+import org.json4s.JsonAST.JObject
+
+
+case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue])
+case class THeader(node_id_str: String, subscription_id_str: String,
+ encoding_path: String, collection_id: Int, collection_start_time: Int,
+ msg_timestamp: Int, collection_end_time: Int)
+case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow])
+
+
+object JsonParser {
+
+ def parse(json: String): TEvent = {
+ implicit val formats = DefaultFormats
+
+ val parsed = JsonMethods.parse(json)
+ val event = parsed.extract[TEvent]
+ event
+ }
+
+ def array(value: JValue): Array[Map[String,JValue]] = {
+ implicit val formats = DefaultFormats
+ val array = value.asInstanceOf[JArray]
+ array.extract[Array[Map[String, JValue]]]
+ }
+
+ def map(value: JValue): Map[String,JValue] = {
+ implicit val formats = DefaultFormats
+ val map = value.asInstanceOf[JObject]
+ map.extract[Map[String, JValue]]
+ }
+
+ def int(value: JValue): String = {
+ value.asInstanceOf[JInt].num.toString
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
new file mode 100644
index 0000000..2983e88
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+
+trait Mapper {
+ def transform(event: TEvent): Set[TimeseriesDatapoint]
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
new file mode 100644
index 0000000..9f9c775
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+
+
+class NullMapper(path: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("NullMapper is sinking " + path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
new file mode 100644
index 0000000..b7deadc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.TimeseriesDatapoint
+import com.cisco.ztt.Transformer
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+
+
+class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val transformers = unit.xr_telemetry.map( d => {
+ Holder.logger.warn("Choosing mapper for " + unit.processor)
+ unit.processor match {
+ case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace)
+ case "inventory" => d.path -> new InventoryMapper(d)
+ case _ => d.path -> new NullMapper(d.path)
+ }
+ }).toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ Holder.logger.trace(rawEvent.getRawdata())
+
+ try {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val event = JsonParser.parse(rawEvent.getRawdata())
+ val path = event.Telemetry.encoding_path
+ if (transformers.contains(path)) {
+ Holder.logger.debug("Transforming for " + path)
+ val datapoints = transformers(path).transform(event)
+ val payloads = datapoints.map(d => {
+ new Payload(source, unit.output_topic,
+ rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ (true, payloads)
+ } else {
+ Holder.logger.trace("No transformer in unit for " + path)
+ (false, Set[Payload]())
+ }
+ } catch {
+ case t: Throwable => {
+ Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+ (false, Set[Payload]())
+ }
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
new file mode 100644
index 0000000..6c9ad80
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JValue
+import com.cisco.ztt.meta.Item
+import com.cisco.ztt.meta.Telemetry
+
+class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => {
+ item.name -> item
+ }).toMap
+
+ val wantedValues = config.content.map( item => {
+ item.name -> item
+ }).toMap
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+
+ val timeseries = event.Rows.flatMap( row => {
+ val keys = row.Keys
+ .getOrElse(Map[String, String]())
+ .filter(k => wantedKeys.contains(k._1))
+ .map( k => {
+ val wanted = wantedKeys(k._1)
+ val name = if (wanted.ts_name != null) wanted.ts_name else k._1
+ name -> k._2
+ }).toMap + ("host" -> event.Telemetry.node_id_str)
+
+ // Flatten nested maps into container.key -> value
+ val expanded = row.Content.flatMap( v => {
+ if (v._2.isInstanceOf[JObject]) {
+ JsonParser.map(v._2).map( kv => {
+ (v._1 + "." + kv._1) -> kv._2
+ })
+ } else {
+ Map[String, JValue](v)
+ }
+ })
+
+ val items = expanded.filter(v => wantedValues.contains(v._1))
+ .map( data => {
+ val wanted = wantedValues(data._1)
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1)
+ val value = JsonParser.int(data._2)
+ val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys)
+ datapoint
+ })
+ items.toSet
+ })
+
+ timeseries.toSet
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
new file mode 100644
index 0000000..84f5bbb
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+class Metadata(val units: Array[Unit]) extends Serializable {
+ def topics() : Set[String] = {
+ val topics = units.map { case (unit : Unit) => {
+ unit.input_topic
+ }}
+
+ topics.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
new file mode 100644
index 0000000..4b72cb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.apache.log4j.Logger
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+case class Unit(
+ format: String,
+ input_topic: String,
+ processor: String,
+ output_topic: String,
+ publish_src: String,
+ timeseries_namespace: String,
+ xr_telemetry: Array[Telemetry],
+ ves_telemetry: Array[Telemetry])
+case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item])
+case class Item(name: String, display_name: String, ts_name: String)
+
+object YamlReader {
+
+ private[this] val logger = Logger.getLogger(getClass().getName());
+
+ val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory())
+ mapper.registerModule(DefaultScalaModule)
+
+ def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = {
+ val patternResolver = new PathMatchingResourcePatternResolver()
+ val mappingLocations = patternResolver.getResources(pattern)
+ val units = mappingLocations.map(loc => {
+ logger.info("Reading metadata " + loc)
+ mapper.readValue(loc.getInputStream, classOf[Unit])
+ });
+ new Metadata(units)
+ }
+
+ def parse(yaml: String): Unit = {
+ val meta: Unit = mapper.readValue(yaml, classOf[Unit])
+ meta
+ }
+
+ def dump(meta: Unit) = {
+ println(" input = " + meta.input_topic)
+ println("output = " + meta.output_topic)
+ meta.xr_telemetry.map(m => {
+ println(" path = " + m.path)
+ println("keys:")
+ m.keys.getOrElse(Array[Item]()).map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ println("content:")
+ m.content.map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
new file mode 100644
index 0000000..4019262
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.meta.Telemetry
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JValue
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Item
+import org.json4s.JsonAST.JObject
+
+
+class VesMapper(config: Telemetry, namespace: String) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = {
+
+ val keys = config.keys.getOrElse(Array[Item]()).map( k => {
+ val value = map.get(k.name).get.toString()
+ k.name -> value
+ }).toMap + ("host" -> host)
+
+ val items = config.content.map( wanted => {
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name)
+ val value = map.get(wanted.name).get.toString()
+
+ new TimeseriesDatapoint(name, value, timestamp, keys)
+ })
+
+ items.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
new file mode 100644
index 0000000..5eaf8f3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.Transformer
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+import org.apache.log4j.Logger
+import org.json4s.jackson.JsonMethods
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.JObject
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JArray
+
+
+class VesTransformer(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val mappers = unit.ves_telemetry.map(d => {
+ d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace)
+ }) //.toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val parsed = JsonMethods.parse(rawEvent.getRawdata)
+
+ if (! parsed.isInstanceOf[JObject]) {
+ Holder.logger.warn("Cannot process parsed JSON")
+ return (false, Set[Payload]())
+ }
+ val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]]
+ val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]]
+ val host = header.get("reportingEntityName").get.toString
+ val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3)
+
+ val generated = mappers.flatMap(r => {
+ val path = r._1
+ val mapper = r._2
+
+ val datapoints = visit(path, value, mapper, host, timestamp)
+ datapoints.map(d => {
+ new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ }).toSet
+ (true, generated)
+ }
+
+ def visit(
+ path: Array[String],
+ map: Map[String, Any],
+ mapper: VesMapper,
+ host: String,
+ timestamp: String): Set[TimeseriesDatapoint] = {
+ if (path.length > 0) {
+ val option = map.get(path.head)
+ option match {
+ case None => {
+ Holder.logger.warn("VES mapper failed to dereference JSON " + path.head)
+ return Set[TimeseriesDatapoint]()
+ }
+
+ case _ => {
+ option.get match {
+ case l: List[_] => {
+ val list = l.asInstanceOf[List[Map[String, Any]]]
+ return list.flatMap(sub => {
+ visit(path.tail, sub, mapper, host, timestamp)
+ }).toSet
+ }
+ case m: Map[_, _] => {
+ val sub = m.asInstanceOf[Map[String, Any]]
+ return visit(path.tail, sub, mapper, host, timestamp)
+ }
+
+ }
+ }
+ }
+ } else {
+ val datapoints = mapper.transform(map, host, timestamp)
+ return datapoints
+ }
+
+ Set[TimeseriesDatapoint]()
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
new file mode 100644
index 0000000..57339f8
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[DStream]]s to Kafka
+ * @param dStream [[DStream]] to be written to Kafka
+ */
+class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[DStream]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ dStream.foreachRDD { rdd =>
+ val rddWriter = new RDDKafkaWriter[T](rdd)
+ rddWriter.writeToKafka(producerConfig, transformFunc, callback)
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
new file mode 100644
index 0000000..2fbf6bc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import java.util.concurrent.{Callable, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+/** Cache of [[KafkaProducer]]s */
+object KafkaProducerCache {
+ private type ProducerConf = Seq[(String, Object)]
+ private type ExProducer = KafkaProducer[_, _]
+
+ private val removalListener = new RemovalListener[ProducerConf, ExProducer]() {
+ override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit =
+ notif.getValue.close()
+ }
+
+ private val cacheExpireTimeout = 10.minutes.toMillis
+ private val cache = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
+ .removalListener(removalListener)
+ .build[ProducerConf, ExProducer]()
+
+ /**
+ * Retrieve a [[KafkaProducer]] in the cache or create a new one
+ * @param producerConfig producer configuration for creating [[KafkaProducer]]
+ * @return a [[KafkaProducer]] already in the cache or a new one
+ */
+ def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] =
+ try {
+ cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] {
+ override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava)
+ }).asInstanceOf[KafkaProducer[K, V]]
+ } catch {
+ case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
+ if e.getCause != null => throw e.getCause
+ }
+
+ /**
+ * Flush and close the [[KafkaProducer]] in the cache associated with this config
+ * @param producerConfig producer configuration associated to a [[KafkaProducer]]
+ */
+ def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig))
+
+ private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1)
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
new file mode 100644
index 0000000..90d2bb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used to write DStreams, RDDs and Datasets to Kafka
+ *
+ * Example usage:
+ * {{{
+ * import com.github.benfradet.spark.kafka.writer.KafkaWriter._
+ * import org.apache.kafka.common.serialization.StringSerializer
+ *
+ * val topic = "my-topic"
+ * val producerConfig = Map(
+ * "bootstrap.servers" -> "127.0.0.1:9092",
+ * "key.serializer" -> classOf[StringSerializer].getName,
+ * "value.serializer" -> classOf[StringSerializer].getName
+ * )
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s)
+ * )
+ *
+ * val rdd: RDD[String] = ...
+ * rdd.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](localTopic, s)
+ * )
+ *
+ * val dataset: Dataset[Foo] = ...
+ * dataset.writeToKafka(
+ * producerConfig,
+ * f => new ProducerRecord[String, String](localTopic, f.toString)
+ * )
+ *
+ * val dataFrame: DataFrame = ...
+ * dataFrame.writeToKafka(
+ * producerConfig,
+ * r => new ProducerRecord[String, String](localTopic, r.get(0).toString)
+ * )
+ * }}}
+ * It is also possible to provide a callback for each write to Kafka.
+ *
+ * This is optional and has a value of None by default.
+ *
+ * Example Usage:
+ * {{{
+ * @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s),
+ * Some(new Callback with Serializable {
+ * override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
+ * if (Option(e).isDefined) {
+ * log.warn("error sending message", e)
+ * } else {
+ * log.info(s"write succeeded, offset: ${metadata.offset()")
+ * }
+ * }
+ * })
+ * )
+ * }}}
+ */
+abstract class KafkaWriter[T: ClassTag] extends Serializable {
+ /**
+ * Write a DStream, RDD, or Dataset to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
new file mode 100644
index 0000000..c6f6133
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[RDD]]s to Kafka
+ * @param rdd [[RDD]] to be written to Kafka
+ */
+class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[RDD]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ rdd.foreachPartition { partition =>
+ val producer = KafkaProducerCache.getProducer[K, V](producerConfig)
+ partition
+ .map(transformFunc)
+ .foreach(record => producer.send(record, callback.orNull))
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
new file mode 100644
index 0000000..04f5ba7
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/** Implicit conversions between
+ * - [[DStream]] -> [[KafkaWriter]]
+ * - [[RDD]] -> [[KafkaWriter]]
+ * - [[Dataset]] -> [[KafkaWriter]]
+ * - [[DataFrame]] -> [[KafkaWriter]]
+ */
+package object writer {
+ /**
+ * Convert a [[DStream]] to a [[KafkaWriter]] implicitly
+ * @param dStream [[DStream]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] =
+ new DStreamKafkaWriter[T](dStream)
+
+ /**
+ * Convert a [[RDD]] to a [[KafkaWriter]] implicitly
+ * @param rdd [[RDD]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] =
+ new RDDKafkaWriter[T](rdd)
+
+}
diff --git a/pnda-ztt-app/src/test/resources/application.properties b/pnda-ztt-app/src/test/resources/application.properties
new file mode 100644
index 0000000..c99a8ad
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/application.properties
@@ -0,0 +1,8 @@
+app.job_name=collectd.tsdb
+kafka.brokers=localhost:9092
+kafka.zookeeper=localhost:2181
+app.checkpoint_path=/tmp
+app.processing_parallelism=1
+app.batch_size_seconds=5
+kafka.consume_from_beginning=false
+opentsdb.ip=localhost:4242
diff --git a/pnda-ztt-app/src/test/resources/log4j.testing.properties b/pnda-ztt-app/src/test/resources/log4j.testing.properties
new file mode 100644
index 0000000..c3b266c
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/log4j.testing.properties
@@ -0,0 +1,14 @@
+log4j.rootCategory=WARN,console
+log4j.logger.org=WARN
+log4j.logger.com.cisco.pnda=DEBUG,console
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,console
+log4j.additivity.com.cisco.ztt=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.immediateFlush=true
+log4j.appender.console.encoding=UTF-8
+
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n
diff --git a/pnda-ztt-app/src/test/resources/meta/test-one.yaml b/pnda-ztt-app/src/test/resources/meta/test-one.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-one.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-three.yaml b/pnda-ztt-app/src/test/resources/meta/test-three.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-three.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-two.yaml b/pnda-ztt-app/src/test/resources/meta/test-two.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-two.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala
new file mode 100644
index 0000000..c08ae2f
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.json4s.JsonAST.JArray
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class JsonParserTest extends FlatSpec with Matchers {
+ val cpu_json = """
+{
+ "Source": "172.16.1.157:27059",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "cpu",
+ "encoding_path": "Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization",
+ "collection_id": 265673,
+ "collection_start_time": 1505905434090,
+ "msg_timestamp": 1505905434090,
+ "collection_end_time": 1505905434103
+ },
+ "Rows": [
+ {
+ "Timestamp": 1505905434099,
+ "Keys": {
+ "node-name": "0/RP0/CPU0"
+ },
+ "Content": {
+ "process-cpu_PIPELINE_EDIT": [
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 1,
+ "process-name": "init"
+ },
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 1544,
+ "process-name": "bash"
+ },
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 26436,
+ "process-name": "sleep"
+ }
+ ],
+ "total-cpu-fifteen-minute": 6,
+ "total-cpu-five-minute": 6,
+ "total-cpu-one-minute": 6
+ }
+ }
+ ]
+}
+ """
+
+ "JsonParser" should "successfully parse cpu telemetry JSON" in {
+ val event = JsonParser.parse(cpu_json)
+
+ event.Telemetry.subscription_id_str should be ("cpu")
+ event.Rows(0).Keys.size should be (1)
+
+ val subrows = event.Rows(0).Content("process-cpu_PIPELINE_EDIT")
+ val extracted = JsonParser.array(subrows)
+
+ extracted.size should be (3)
+ extracted(0).size should be (5)
+ }
+
+ val null_keys = """
+{
+ "Source": "172.16.1.157:49227",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "Logging",
+ "encoding_path": "Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics",
+ "collection_id": 925712,
+ "collection_start_time": 1507552918199,
+ "msg_timestamp": 1507552918199,
+ "collection_end_time": 1507552918203
+ },
+ "Rows": [
+ {
+ "Timestamp": 1507552918201,
+ "Keys": null,
+ "Content": {
+ "buffer-logging-stats": {
+ "buffer-size": 2097152,
+ "is-log-enabled": "true",
+ "message-count": 221,
+ "severity": "message-severity-debug"
+ }
+ }
+ }
+ ]
+}
+ """
+
+ it should "successfully parse JSON with null keys" in {
+ val event = JsonParser.parse(null_keys)
+
+ event.Rows(0).Keys.size should be (0)
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala
new file mode 100644
index 0000000..b133c9e
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.ztt.cisco.xr.telemetry.TimeseriesMapper
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class TelemetryMapperTest extends FlatSpec with Matchers {
+
+ val ipv6_yaml = """
+input_topic: telemetry.avro
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: ipv6.total-packets
+ display_name: "Total IPV6 Packets"
+ ts_name: total-ipv6-packets
+
+ - name: icmp.total-messages
+ display_name: "Total ICMP Messages"
+"""
+
+ val ipv6_json = """
+{
+ "Source": "172.16.1.157:56197",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "ipv6-io",
+ "encoding_path": "Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic",
+ "collection_id": 282962,
+ "collection_start_time": 1506008887021,
+ "msg_timestamp": 1506008887021,
+ "collection_end_time": 1506008887039
+ },
+ "Rows": [
+ {
+ "Timestamp": 1506008887031,
+ "Keys": {
+ "node-name": "0/RP0/CPU0"
+ },
+ "Content": {
+ "icmp": {
+ "output-messages": 0,
+ "total-messages": 4,
+ "unknown-error-type-messages": 0
+ },
+ "ipv6": {
+ "bad-header-packets": 0,
+ "total-packets": 12,
+ "unknown-protocol-packets": 0
+ },
+ "ipv6-node-discovery": {
+ "received-redirect-messages": 0,
+ "sent-redirect-messages": 0
+ }
+ }
+ }
+ ]
+}
+"""
+
+ "TelemetryMapper" should "Map to wanted timeseries values" in {
+ val meta = YamlReader.parse(ipv6_yaml)
+ val mapper = new TimeseriesMapper(meta.xr_telemetry(0), "demo")
+
+ val event = JsonParser.parse(ipv6_json)
+ val timeseries = mapper.transform(event).toList
+
+ timeseries.length should be (2)
+ timeseries(0).metric should be ("demo.icmp.total-messages")
+ timeseries(1).metric should be ("demo.total-ipv6-packets")
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala
new file mode 100644
index 0000000..2ee1749
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.Matchers
+import org.scalatest.FlatSpec
+import com.cisco.ztt.TimeseriesDatapoint
+import java.io.StringWriter
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+class DatapointTest extends FlatSpec with Matchers {
+
+ "TimeseriesDatapoint" should "serialize to json" in {
+ val data = Array(
+ new TimeseriesDatapoint("packets-in", "5", "1000000",
+ Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1")),
+ new TimeseriesDatapoint("packets-out", "5", "1000000",
+ Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1"))
+ )
+
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, data)
+ val json = out.toString()
+ }
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala
new file mode 100644
index 0000000..eabe54e
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+class YamlReaderTest extends FlatSpec with Matchers {
+
+ val interface_yaml = """
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
+
+ - name: packets-received
+ display_name: "Packets Received"
+
+ - name: packets-sent
+ display_name: "Packets Sent"
+"""
+
+ "YamlReader" should "successfully parse YAML text" in {
+ val meta = YamlReader.parse(interface_yaml)
+
+ meta.input_topic should be ("telemetry")
+ meta.output_topic should be ("timeseries")
+ meta.xr_telemetry.length should be (1)
+ meta.xr_telemetry(0).keys.get.length should be(1)
+ meta.xr_telemetry(0).content.length should be(4)
+ }
+
+ it should "read multiple files from classpath" in {
+ val meta = YamlReader.load("classpath*:meta/test-*.yaml")
+ meta.units.length should be (3)
+ }
+}
diff --git a/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala
new file mode 100644
index 0000000..58f87fe
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ves.telemetry
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.ves.telemetry.VesTransformer
+import org.apache.log4j.BasicConfigurator
+
+class VesTransformerTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+ before {
+ BasicConfigurator.configure();
+ }
+
+ val yaml = """
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/vNicUsageArray
+ keys:
+ - name: vNicIdentifier
+ content:
+ - name: receivedTotalPacketsDelta
+"""
+ val payload = """{
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1537258053361276,
+ "sourceId": "bab0c4de-cfb8-4a0d-b7e4-a3d4020bb667",
+ "eventId": "TrafficStats_1.2.3.4",
+ "nfcNamingCode": "vVNF",
+ "reportingEntityId": "No UUID available",
+ "internalHeaderFields": {
+ "collectorTimeStamp": "Tue, 09 18 2018 08:01:41 GMT"
+ },
+ "eventType": "HTTP request rate",
+ "priority": "Normal",
+ "version": 3,
+ "reportingEntityName": "fwll",
+ "sequence": 6108,
+ "domain": "measurementsForVfScaling",
+ "lastEpochMicrosec": 1537258063557408,
+ "eventName": "vFirewallBroadcastPackets",
+ "sourceName": "vFW_SINK_VNF2",
+ "nfNamingCode": "vVNF"
+ },
+ "measurementsForVfScalingFields": {
+ "cpuUsageArray": [
+ {
+ "percentUsage": 0,
+ "cpuIdentifier": "cpu1",
+ "cpuIdle": 100,
+ "cpuUsageSystem": 0,
+ "cpuUsageUser": 0
+ }
+ ],
+ "measurementInterval": 10,
+ "requestRate": 9959,
+ "vNicUsageArray": [
+ {
+ "transmittedOctetsDelta": 0,
+ "receivedTotalPacketsDelta": 0,
+ "vNicIdentifier": "eth0",
+ "valuesAreSuspect": "true",
+ "transmittedTotalPacketsDelta": 0,
+ "receivedOctetsDelta": 0
+ }
+ ],
+ "measurementsForVfScalingVersion": 2.1
+ }
+ }
+}
+"""
+ "VesTransformer" should "successfully transform a VES event" in {
+ val unit = YamlReader.parse(yaml)
+ val ves = new VesTransformer(unit)
+
+ val event = new DataPlatformEvent("src", System.currentTimeMillis(), "127.0.0.1", payload)
+ val (status, result) = ves.transform(event)
+
+ status should be(true)
+ result.toList.length should be (1)
+ result.toList(0).datapoint.metric should be ("nic.receivedTotalPacketsDelta")
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore
new file mode 100644
index 0000000..d392f0e
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore
@@ -0,0 +1 @@
+*.jar
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties
new file mode 100644
index 0000000..8c1b965
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties
@@ -0,0 +1,8 @@
+kafka.brokers=${environment_kafka_brokers}
+kafka.zookeeper=${environment_kafka_zookeeper}
+app.checkpoint_path=${component_checkpoint_path}
+app.job_name=${component_job_name}
+app.processing_parallelism=${component_processing_parallelism}
+app.batch_size_seconds=${component_batch_size_seconds}
+kafka.consume_from_beginning=${component_consume_from_beginning}
+opentsdb.ip=${environment_opentsdb} \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties
new file mode 100644
index 0000000..6a3cab1
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=ERROR,rolling
+log4j.logger.com.cisco.pnda=${component.log_level},rolling
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,rolling
+log4j.additivity.com.cisco.ztt=false
+
+log4j.appender.rolling=org.apache.log4j.RollingFileAppender
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
+log4j.appender.rolling.maxFileSize=50MB
+log4j.appender.rolling.maxBackupIndex=1
+log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/spark.log
+log4j.appender.rolling.encoding=UTF-8
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json
new file mode 100644
index 0000000..1029987
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json
@@ -0,0 +1,8 @@
+[
+ {
+ "name": "nic.receivedTotalPacketsDelta"
+ },
+ {
+ "name": "cpu.percentUsage"
+ }
+] \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
new file mode 100644
index 0000000..2f8ab6a
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
@@ -0,0 +1,10 @@
+{
+ "main_jar": "PndaZTTApp.jar",
+ "main_class": "com.cisco.ztt.App",
+ "log_level": "INFO",
+ "batch_size_seconds" : "2",
+ "processing_parallelism" : "1",
+ "checkpoint_path": "",
+ "input_topic": "ves.avro",
+ "consume_from_beginning": "false"
+}
diff --git a/pnda-ztt-app/test/application.properties b/pnda-ztt-app/test/application.properties
new file mode 100644
index 0000000..e7a2923
--- /dev/null
+++ b/pnda-ztt-app/test/application.properties
@@ -0,0 +1,8 @@
+kafka.brokers=192.168.10.5:9092
+kafka.zookeeper=192.168.10.5:2181
+app.checkpoint_path=
+app.job_name=Zomg
+app.processing_parallelism=1
+app.batch_size_seconds=2
+kafka.consume_from_beginning=false
+opentsdb.ip=
diff --git a/pnda-ztt-app/test/log4j.testing.properties b/pnda-ztt-app/test/log4j.testing.properties
new file mode 100644
index 0000000..1388b20
--- /dev/null
+++ b/pnda-ztt-app/test/log4j.testing.properties
@@ -0,0 +1,14 @@
+log4j.rootCategory=TRACE,console
+log4j.logger.org=TRACE
+log4j.logger.com.cisco.pnda=TRACE,console
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=TRACE,console
+log4j.additivity.com.cisco.ztt=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.immediateFlush=true
+log4j.appender.console.encoding=UTF-8
+
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6b193d5
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+================================================================================
+
+dcae-analytics-pnda
+
+================================================================================
+Copyright (c) 2017 Cisco Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.oparent</groupId>
+ <artifactId>oparent</artifactId>
+ <version>1.2.1</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.onap.dcaegen2.analytics.pnda</groupId>
+ <artifactId>pnda-utils</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>pnda-ztt-app</module>
+ </modules>
+
+</project>