summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app/src
diff options
context:
space:
mode:
Diffstat (limited to 'pnda-ztt-app/src')
-rw-r--r--pnda-ztt-app/src/main/resources/dataplatform-raw.avsc10
-rw-r--r--pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml17
-rw-r--r--pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml19
-rw-r--r--pnda-ztt-app/src/main/resources/meta/fib-summary.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml36
-rw-r--r--pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml16
-rw-r--r--pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml58
-rw-r--r--pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml23
-rw-r--r--pnda-ztt-app/src/main/resources/meta/logging-stats.yaml11
-rw-r--r--pnda-ztt-app/src/main/resources/meta/memory-detail.yaml31
-rw-r--r--pnda-ztt-app/src/main/resources/meta/memory-summary.yaml25
-rw-r--r--pnda-ztt-app/src/main/resources/meta/rib-oper.yaml27
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml12
-rw-r--r--pnda-ztt-app/src/main/resources/meta/ves-nic.yaml12
-rw-r--r--pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml27
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala93
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java54
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala81
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala110
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala25
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala35
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala99
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala62
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala22
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala76
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala26
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala74
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala48
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala104
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala50
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala71
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala103
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala52
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala52
-rw-r--r--pnda-ztt-app/src/test/resources/application.properties8
-rw-r--r--pnda-ztt-app/src/test/resources/log4j.testing.properties14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-one.yaml14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-three.yaml14
-rw-r--r--pnda-ztt-app/src/test/resources/meta/test-two.yaml14
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala122
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala95
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala43
-rw-r--r--pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala61
-rw-r--r--pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala106
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore1
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties8
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties13
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json8
-rw-r--r--pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json10
61 files changed, 2763 insertions, 0 deletions
diff --git a/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc
new file mode 100644
index 0000000..5450771
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc
@@ -0,0 +1,10 @@
+{"namespace": "com.cisco.pnda",
+ "type": "record",
+ "name": "PndaRecord",
+ "fields": [
+ {"name": "timestamp", "type": "long"},
+ {"name": "src", "type": "string"},
+ {"name": "host_ip", "type": "string"},
+ {"name": "rawdata", "type": "bytes"}
+ ]
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml
new file mode 100644
index 0000000..5d56f77
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml
@@ -0,0 +1,17 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: bgp
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv4-bgp-oper:bgp/instances/instance/instance-active/default-vrf/afs/af/neighbor-af-table/neighbor
+ keys:
+ - name: af-name
+ display_name: "Address Family Name"
+ - name: instance-name
+ display_name: "Instance Name"
+ - name: neighbor-address
+ display_name: "Neighbor Address"
+ content:
+ - name: connection-up-count
+ display_name: "Connection Up Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml
new file mode 100644
index 0000000..0ce8870
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml
@@ -0,0 +1,19 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: total-cpu-one-minute
+ display_name: "One-minute CPU Total"
+
+ - name: total-cpu-five-minute
+ display_name: "Five-minute CPU Total"
+
+ - name: total-cpu-fifteen-minute
+ display_name: "Fifteen-minute CPU Total"
diff --git a/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml
new file mode 100644
index 0000000..2dbb4ae
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/fib-summaries/fib-summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ - name: protocol-name
+ display_name: "Protocol Name"
+ - name: vrf-name
+ display_name: "VRF Name"
+ content:
+ - name: extended-prefixes
+ display_name: "Num Extended Prefixes"
+
+ - name: forwarding-elements
+ display_name: "Num Forwarding Elements"
+
+ - name: next-hops
+ display_name: "Num Next Hops"
+
+ - name: routes
+ display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml
new file mode 100644
index 0000000..24fb808
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml
@@ -0,0 +1,36 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: interface
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+ ts_name: bytes-in # rename the metric in OpenTSDB
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
+ ts_name: bytes-out # rename the metric in OpenTSDB
+
+ - name: packets-received
+ display_name: "Packets Received"
+
+ - name: packets-sent
+ display_name: "Packets Sent"
+
+ - name: broadcast-packets-received
+ display_name: "Broadcast Packets Received"
+
+ - name: broadcast-packets-sent
+ display_name: "Broadcast Packets Sent"
+
+ - name: multicast-packets-received
+ display_name: "Multicast Packets Received"
+
+ - name: multicast-packets-sent
+ display_name: "Multicast Packets Sent"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml
new file mode 100644
index 0000000..3bc9689
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-invmgr-oper:inventory/entities/entity/attributes/inv-basic-bag
+ keys:
+ - name: name
+ display_name: "Entity Name"
+ content:
+ - name: serial-number
+ display_name: "Serial Number"
+
+ - name: description
+ display_name: "Description"
+
+ - name: manufacturer-name
+ display_name: "Manufacturer"
+
+ - name: model-name
+ display_name: "Model Name"
+
+ - name: software-revision
+ display_name: "Software Revision"
+
+ - name: vendor-type
+ display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml
new file mode 100644
index 0000000..db6386d
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-invmgr-oper:inventory/racks/rack/attributes/inv-basic-bag
+ keys:
+ - name: name
+ display_name: "Entity Name"
+ content:
+ - name: serial-number
+ display_name: "Serial Number"
+
+ - name: description
+ display_name: "Description"
+
+ - name: manufacturer-name
+ display_name: "Manufacturer"
+
+ - name: model-name
+ display_name: "Model Name"
+
+ - name: software-revision
+ display_name: "Software Revision"
+
+ - name: vendor-type
+ display_name: "Vendor OID"
diff --git a/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml
new file mode 100644
index 0000000..3a5dcba
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml
@@ -0,0 +1,16 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: ipv6.total-packets
+ display_name: "Total IPV6 Packets"
+
+ - name: icmp.total-messages
+ display_name: "Total ICMP Messages"
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml
new file mode 100644
index 0000000..2360de6
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml
@@ -0,0 +1,58 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: inventory
+output_topic: inventory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/summaries/summary
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: chassis-id
+ display_name: "Chassis Id"
+
+ - name: device-id
+ display_name: "Device Id"
+
+ - name: port-id-detail
+ display_name: "Port Id"
+
+ - name: receiving-interface-name
+ display_name: "Receiving Interface Name"
+
+ - name: enabled-capabilities
+ display_name: "Enabled Capabilities"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: lldp-neighbor.chassis-id
+ display_name: "Chassis Id"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/details/detail
+ keys:
+ - name: device-id
+ display_name: "Device Id"
+ - name: interface-name
+ display_name: "Interface Name"
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: lldp-neighbor.chassis-id
+ display_name: "Chassis Id"
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/interfaces/interface
+ keys:
+ - name: interface-name
+ - name: node-name
+ content:
+ - name: interface-name
+ - name: if-index
diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml
new file mode 100644
index 0000000..e29e32c
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml
@@ -0,0 +1,23 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: lldp.stats
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/statistics
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: received-packets
+ - name: transmitted-packets
+ - name: aged-out-entries
+ - name: bad-packets
+ - name: discarded-packets
+ - name: discarded-tl-vs
+ - name: encapsulation-errors
+ - name: out-of-memory-errors
+ - name: queue-overflow-errors
+ - name: table-overflow-errors
+ - name: unrecognized-tl-vs
diff --git a/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml
new file mode 100644
index 0000000..5d12152
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml
@@ -0,0 +1,11 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: logging
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics
+ content:
+ - name: buffer-logging-stats.message-count
+ ts_name: message-count
+ display_name: "Serial Number"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml
new file mode 100644
index 0000000..6b3f657
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml
@@ -0,0 +1,31 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/detail
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: allocated-memory
+ display_name: "Allocated Memory"
+
+ - name: free-application-memory
+ display_name: "Free Application Memory"
+
+ - name: free-physical-memory
+ display_name: "Free Physical Memory"
+
+ - name: ram-memory
+ display_name: "RAM Memory"
+
+ - name: program-data
+ display_name: "Program Data"
+
+ - name: program-stack
+ display_name: "Program Stack"
+
+ - name: program-text
+ display_name: "Program Text"
diff --git a/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml
new file mode 100644
index 0000000..02adef6
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml
@@ -0,0 +1,25 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: memory.summary
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: allocated-memory
+ display_name: "Allocated Memory"
+
+ - name: free-application-memory
+ display_name: "Free Application Memory"
+
+ - name: free-physical-memory
+ display_name: "Free Physical Memory"
+
+ - name: ram-memory
+ display_name: "RAM Memory"
+
+ - name: system-ram-memory
+ display_name: "System RAM Memopry"
diff --git a/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml
new file mode 100644
index 0000000..197b0b9
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml
@@ -0,0 +1,27 @@
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: rib
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ip-rib-ipv4-oper:rib/vrfs/vrf/afs/af/safs/saf/ip-rib-route-table-names/ip-rib-route-table-name/protocol/bgp/as/information
+ keys:
+ - name: af-name
+ display_name: "Address Family Name"
+ - name: as
+ display_name: "Address Family"
+ - name: route-table-name
+ display_name: "Route table name"
+ - name: saf-name
+ display_name: "Saf name"
+ - name: vrf-name
+ display_name: "Vrf name"
+ content:
+ - name: active-routes-count
+ display_name: "Active Routes Count"
+ - name: instance
+ display_name: "Instance"
+ - name: paths-count
+ display_name: "Paths Count"
+ - name: routes-counts
+ display_name: "Routes Count"
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml
new file mode 100644
index 0000000..aaf1de8
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: cpu
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/cpuUsageArray
+ keys:
+ - name: cpuIdentifier
+ content:
+ - name: percentUsage
diff --git a/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml
new file mode 100644
index 0000000..a6ae3de
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml
@@ -0,0 +1,12 @@
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/vNicUsageArray
+ keys:
+ - name: vNicIdentifier
+ content:
+ - name: receivedTotalPacketsDelta
diff --git a/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml
new file mode 100644
index 0000000..df466c3
--- /dev/null
+++ b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml
@@ -0,0 +1,27 @@
+format: cisco.xr.telemetry
+input_topic: telemetry.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: fib.vrf
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/vrfs/vrf/summary
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ - name: protocol-name
+ display_name: "Protocol Name"
+ - name: vrf-name
+ display_name: "VRF Name"
+ content:
+ - name: extended-prefixes
+ display_name: "Num Extended Prefixes"
+
+ - name: forwarding-elements
+ display_name: "Num Forwarding Elements"
+
+ - name: next-hops
+ display_name: "Num Next Hops"
+
+ - name: routes
+ display_name: "Num Routes"
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
new file mode 100644
index 0000000..b143b62
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: StatReporter
+ * Purpose: Report batch processing metrics to the PNDA metric logger
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.pnda
+
+import scala.util.control.NonFatal
+import java.io.StringWriter
+import java.io.PrintWriter
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
+import org.apache.log4j.Logger
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+
+class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
+ def doSend(metricName: String, metricValue: String) = {
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(metricsUrl)
+ post.setHeader("Content-type", "application/json")
+ val ts = java.lang.System.currentTimeMillis()
+ val body = f"""{
+ | "data": [{
+ | "source": "application.$appName",
+ | "metric": "application.kpi.$appName.$metricName",
+ | "value": "$metricValue",
+ | "timestamp": $ts%d
+ | }],
+ | "timestamp": $ts%d
+ |}""".stripMargin
+
+ logger.debug(body)
+ post.setEntity(new StringEntity(body))
+ val response = httpClient.execute(post)
+ if (response.getStatusLine.getStatusCode() != 200) {
+ logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
+ }
+
+ } catch {
+ case NonFatal(t) => {
+ logger.error("POST failed: " + metricsUrl)
+ val sw = new StringWriter
+ t.printStackTrace(new PrintWriter(sw))
+ logger.error(sw.toString)
+ }
+ }
+ }
+ doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
+ doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
+ doSend("num-records", batch.batchInfo.numRecords.toString())
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
new file mode 100644
index 0000000..1a08bd4
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEvent
+ * Purpose: Data model class for an avro event on Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class DataPlatformEvent implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+ protected static ObjectMapper _mapper = new ObjectMapper();
+
+ private String _src;
+ private Long _timestamp;
+ private String _hostIp;
+ private String _rawdata;
+
+ public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata)
+ {
+ _src = src;
+ _timestamp = timestamp;
+ _hostIp = host_ip;
+ _rawdata = rawdata;
+ }
+
+ public String getSrc()
+ {
+ return _src;
+ }
+
+ public Long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public String getHostIp()
+ {
+ return _hostIp;
+ }
+
+ public String getRawdata()
+ {
+ return _rawdata;
+ }
+
+ @Override
+ public String toString()
+ {
+ try
+ {
+ return _mapper.writeValueAsString(this);
+ }
+ catch (Exception ex)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ boolean result = false;
+ if (other instanceof DataPlatformEvent)
+ {
+ DataPlatformEvent that = (DataPlatformEvent) other;
+ result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc())))
+ && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp())))
+ && (this.getHostIp() == that.getHostIp() || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp())))
+ && (this.getRawdata() == that.getRawdata() || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata())));
+ }
+ return result;
+
+ }
+
+ public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException
+ {
+ return _mapper.readTree(_rawdata);
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
new file mode 100644
index 0000000..ef2bc0d
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEventDecoder
+ * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.log4j.Logger;
+
+
+public class DataPlatformEventCodec
+{
+ private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName());
+
+ private Schema.Parser _parser = new Schema.Parser();
+ private DatumReader<GenericRecord> _reader;
+ private DatumWriter<GenericRecord> _writer;
+ private Schema _schema;
+ private String _schemaDef;
+
+ public DataPlatformEventCodec(String schemaDef) throws IOException
+ {
+ _schemaDef = schemaDef;
+ _schema = _parser.parse(schemaDef);
+ _reader = new GenericDatumReader<GenericRecord>(_schema);
+ _writer = new GenericDatumWriter<GenericRecord>(_schema);
+ }
+
+ public DataPlatformEvent decode(byte[] data) throws IOException
+ {
+ try
+ {
+ Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+ GenericRecord r = _reader.read(null, decoder);
+ return new DataPlatformEvent((String) r.get("src").toString(),
+ (Long) r.get("timestamp"),
+ (String) r.get("host_ip").toString(),
+ new String(((ByteBuffer)r.get("rawdata")).array()));
+ }
+ catch(Exception ex)
+ {
+ LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex);
+ throw new IOException(ex);
+ }
+ }
+
+ final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
+ public static String hexStr(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for ( int j = 0; j < bytes.length; j++ ) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = hexArray[v >>> 4];
+ hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ public byte[] encode(DataPlatformEvent e) throws IOException
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ GenericRecord datum = new GenericData.Record(_schema);
+ datum.put("src", e.getSrc());
+ datum.put("timestamp", e.getTimestamp());
+ datum.put("host_ip", e.getHostIp());
+ datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8")));
+ _writer.write(datum, encoder);
+ encoder.flush();
+ out.close();
+ return out.toByteArray();
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
new file mode 100644
index 0000000..24a9d35
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: StaticHelpers
+ * Purpose: Helper functions
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class StaticHelpers {
+ public static String loadResourceFile(String path)
+ {
+ InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path);
+ try
+ {
+ char[] buf = new char[2048];
+ Reader r = new InputStreamReader(is, "UTF-8");
+ StringBuilder s = new StringBuilder();
+ while (true)
+ {
+ int n = r.read(buf);
+ if (n < 0)
+ break;
+ s.append(buf, 0, n);
+ }
+ return s.toString();
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
new file mode 100644
index 0000000..6bc2083
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import com.cisco.pnda.StatReporter
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.YamlReader
+import org.apache.log4j.BasicConfigurator
+
+object App {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ def main(args: Array[String]) {
+
+ BasicConfigurator.configure();
+
+ val props = AppConfig.loadProperties();
+ val loggerUrl = props.getProperty("environment.metric_logger_url")
+ val appName = props.getProperty("component.application")
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val metadata = YamlReader.load()
+ if (metadata.units.length == 0) {
+ logger.error("Trying to run app without metadata")
+ System.exit(1)
+ }
+ val pipeline = new ZttPipeline(metadata)
+
+ // Create the streaming context, or load a saved one from disk
+ val ssc = if (checkpointDirectory.length() > 0)
+ StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create();
+
+ sys.ShutdownHookThread {
+ logger.info("Gracefully stopping Spark Streaming Application")
+ ssc.stop(true, true)
+ logger.info("Application stopped")
+ }
+
+ if (loggerUrl != null) {
+ logger.info("Reporting stats to url: " + loggerUrl)
+ ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
+ }
+ logger.info("Starting spark streaming execution")
+ ssc.start()
+ ssc.awaitTermination()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
new file mode 100644
index 0000000..a711cab
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: AppConfig
+ * Purpose: Load application properties file.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.ztt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class AppConfig
+{
+ private static final Logger LOGGER = Logger.getLogger(AppConfig.class);
+
+ private static Properties _properties;
+ private static Object _lock = new Object();
+
+ public static Properties loadProperties()
+ {
+ synchronized (_lock)
+ {
+ if (_properties == null)
+ {
+ _properties = new Properties();
+ try
+ {
+ InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties");
+ _properties.load(is);
+ is.close();
+ LOGGER.info("Properties loaded");
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Failed to load properties", e);
+ System.exit(1);
+ }
+ }
+ return _properties;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
new file mode 100644
index 0000000..c0bc61e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaInput
+ * Purpose: Generate a dstream from Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.StringDecoder
+import org.apache.log4j.Logger
+
+class KafkaInput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def readFromKafka(ssc: StreamingContext, topic: String) = {
+ val props = AppConfig.loadProperties();
+ val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
+ kafkaParams.put("auto.offset.reset", "smallest");
+ }
+
+ Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using topic " + topic)
+
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
+ ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+
+ // Decode avro container format
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val rawMessages = messages.map(x => {
+ val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
+ val payload = x._2;
+ val dataPlatformEvent = eventDecoder.decode(payload);
+ dataPlatformEvent;
+ });
+ rawMessages;
+ };
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
new file mode 100644
index 0000000..1041d00
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+
+
+
+import java.io.StringWriter
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter
+
+
+class KafkaOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def writeToKafka(output: DStream[Payload]) = {
+
+ val props = AppConfig.loadProperties();
+ val producerConfig = Map(
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.serializer" -> classOf[StringSerializer].getName,
+ "value.serializer" -> classOf[ByteArraySerializer].getName
+)
+ output.writeToKafka(
+ producerConfig,
+ s => {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, s.datapoint)
+ val json = out.toString()
+
+ val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json)
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val codec = new DataPlatformEventCodec(avroSchemaString);
+
+ new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event))
+ }
+ )
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
new file mode 100644
index 0000000..9077986
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: OpenTSDBOutput
+ * Purpose: Write a dstream to OpenTSDB
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import java.io.StringWriter
+
+import scala.Iterator
+
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import scala.collection.mutable.ArrayBuffer
+
+class OpenTSDBOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def putOpentsdb[T](opentsdbIP: String,
+ input: DStream[Payload]): DStream[Payload] = {
+ input.mapPartitions(partition => {
+ var size = 0
+ val output = partition.grouped(20).flatMap(group => {
+ val data = ArrayBuffer[TimeseriesDatapoint]()
+ val passthru = group.map(item => {
+ data += item.datapoint
+ item
+ })
+
+ size += data.length
+
+ if (data.length > 0) {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, data)
+ val json = out.toString()
+
+ Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB")
+ Holder.logger.debug(json)
+
+ if (opentsdbIP != null && opentsdbIP.length() > 0) {
+ val openTSDBUrl = "http://" + opentsdbIP + "/api/put"
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(openTSDBUrl)
+ post.setHeader("Content-type", "application/json")
+ post.setEntity(new StringEntity(json))
+ val response = httpClient.execute(post)
+ // Holder.logger.debug(EntityUtils.toString(response.getEntity()))
+ } catch {
+ case t: Throwable => {
+ Holder.logger.warn(t)
+ }
+ }
+ }
+ } else {
+ Holder.logger.debug("No datapoints to post to OpenTSDB")
+ }
+ passthru
+ })
+ output
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
new file mode 100644
index 0000000..2961bd0
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class Payload(
+ val publishSrc: String,
+ val publishTopic: String,
+ val hostIp: String,
+ val timestamp: Long,
+ val datapoint: TimeseriesDatapoint) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
new file mode 100644
index 0000000..7e447bf
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class TimeseriesDatapoint(
+ val metric: String,
+ val value: String,
+ val timestamp: String,
+ val tags: Map[String, String]) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
new file mode 100644
index 0000000..aae1e8c
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+import com.cisco.ztt.meta.Metadata
+import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher
+import com.cisco.ztt.ves.telemetry.VesTransformer
+
+class TransformManager(metadata: Metadata) {
+
+ val transformers: Array[Transformer] = metadata.units.map( unit => {
+
+ unit.format match {
+ case "ves" => new VesTransformer(unit)
+ case "cisco.xr.telemetry" => new TelemetryDispatcher(unit)
+ case _ => new TelemetryDispatcher(unit)
+ }
+ })
+
+ val byTopic = transformers.groupBy( t => { t.inputTopic })
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
new file mode 100644
index 0000000..861eb5e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import com.cisco.pnda.model.DataPlatformEvent
+import org.apache.spark.streaming.dstream.DStream
+
+trait Transformer {
+ def inputTopic: String
+ def transform(event: DataPlatformEvent): (Boolean, Set[Payload])
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
new file mode 100644
index 0000000..c3b3532
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaPipeline
+ * Purpose: Set up the spark streaming processing graph.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import com.cisco.ztt.meta.Metadata
+import com.cisco.pnda.model.DataPlatformEvent
+
+class ZttPipeline(metadata: Metadata) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def create() = {
+ val props = AppConfig.loadProperties();
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val sparkConf = new SparkConf();
+ Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory)
+ val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds));
+
+ if (checkpointDirectory.length() > 0) {
+ ssc.checkpoint(checkpointDirectory)
+ }
+
+ val transformManager = new TransformManager(metadata)
+ val streams = transformManager.byTopic.map( x => {
+ val topic = x._1
+ val transformers = x._2
+
+ val inputStream = new KafkaInput().readFromKafka(ssc, topic)
+
+ val timeseriesStream = inputStream.flatMap(dataPlatformEvent => {
+ var handled = false;
+ val datapoints = transformers.flatMap(transformer => {
+ val (ran, data) = transformer.transform(dataPlatformEvent)
+ handled |= ran;
+ data;
+ })
+ if (!handled) {
+ Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata)
+ }
+ datapoints
+ })
+
+ val outputStream =
+ new OpenTSDBOutput().putOpentsdb(
+ props.getProperty("opentsdb.ip"),
+ timeseriesStream);
+
+ new KafkaOutput().writeToKafka(outputStream)
+ });
+
+ ssc;
+ }: StreamingContext
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
new file mode 100644
index 0000000..00470ff
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Telemetry
+
+class InventoryMapper(config: Telemetry) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("InventoryMapper is sinking " + config.path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
new file mode 100644
index 0000000..97f4da3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.JsonAST.JInt
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods
+import org.json4s.jvalue2extractable
+import org.json4s.string2JsonInput
+import scala.reflect.ManifestFactory.arrayType
+import scala.reflect.ManifestFactory.classType
+import org.json4s.JsonAST.JObject
+
+
+case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue])
+case class THeader(node_id_str: String, subscription_id_str: String,
+ encoding_path: String, collection_id: Int, collection_start_time: Int,
+ msg_timestamp: Int, collection_end_time: Int)
+case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow])
+
+
+object JsonParser {
+
+ def parse(json: String): TEvent = {
+ implicit val formats = DefaultFormats
+
+ val parsed = JsonMethods.parse(json)
+ val event = parsed.extract[TEvent]
+ event
+ }
+
+ def array(value: JValue): Array[Map[String,JValue]] = {
+ implicit val formats = DefaultFormats
+ val array = value.asInstanceOf[JArray]
+ array.extract[Array[Map[String, JValue]]]
+ }
+
+ def map(value: JValue): Map[String,JValue] = {
+ implicit val formats = DefaultFormats
+ val map = value.asInstanceOf[JObject]
+ map.extract[Map[String, JValue]]
+ }
+
+ def int(value: JValue): String = {
+ value.asInstanceOf[JInt].num.toString
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
new file mode 100644
index 0000000..2983e88
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+
+trait Mapper {
+ def transform(event: TEvent): Set[TimeseriesDatapoint]
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
new file mode 100644
index 0000000..9f9c775
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+
+
+class NullMapper(path: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("NullMapper is sinking " + path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
new file mode 100644
index 0000000..b7deadc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.TimeseriesDatapoint
+import com.cisco.ztt.Transformer
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+
+
+class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val transformers = unit.xr_telemetry.map( d => {
+ Holder.logger.warn("Choosing mapper for " + unit.processor)
+ unit.processor match {
+ case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace)
+ case "inventory" => d.path -> new InventoryMapper(d)
+ case _ => d.path -> new NullMapper(d.path)
+ }
+ }).toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ Holder.logger.trace(rawEvent.getRawdata())
+
+ try {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val event = JsonParser.parse(rawEvent.getRawdata())
+ val path = event.Telemetry.encoding_path
+ if (transformers.contains(path)) {
+ Holder.logger.debug("Transforming for " + path)
+ val datapoints = transformers(path).transform(event)
+ val payloads = datapoints.map(d => {
+ new Payload(source, unit.output_topic,
+ rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ (true, payloads)
+ } else {
+ Holder.logger.trace("No transformer in unit for " + path)
+ (false, Set[Payload]())
+ }
+ } catch {
+ case t: Throwable => {
+ Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+ (false, Set[Payload]())
+ }
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
new file mode 100644
index 0000000..6c9ad80
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JValue
+import com.cisco.ztt.meta.Item
+import com.cisco.ztt.meta.Telemetry
+
+class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => {
+ item.name -> item
+ }).toMap
+
+ val wantedValues = config.content.map( item => {
+ item.name -> item
+ }).toMap
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+
+ val timeseries = event.Rows.flatMap( row => {
+ val keys = row.Keys
+ .getOrElse(Map[String, String]())
+ .filter(k => wantedKeys.contains(k._1))
+ .map( k => {
+ val wanted = wantedKeys(k._1)
+ val name = if (wanted.ts_name != null) wanted.ts_name else k._1
+ name -> k._2
+ }).toMap + ("host" -> event.Telemetry.node_id_str)
+
+ // Flatten nested maps into container.key -> value
+ val expanded = row.Content.flatMap( v => {
+ if (v._2.isInstanceOf[JObject]) {
+ JsonParser.map(v._2).map( kv => {
+ (v._1 + "." + kv._1) -> kv._2
+ })
+ } else {
+ Map[String, JValue](v)
+ }
+ })
+
+ val items = expanded.filter(v => wantedValues.contains(v._1))
+ .map( data => {
+ val wanted = wantedValues(data._1)
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1)
+ val value = JsonParser.int(data._2)
+ val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys)
+ datapoint
+ })
+ items.toSet
+ })
+
+ timeseries.toSet
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
new file mode 100644
index 0000000..84f5bbb
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+class Metadata(val units: Array[Unit]) extends Serializable {
+ def topics() : Set[String] = {
+ val topics = units.map { case (unit : Unit) => {
+ unit.input_topic
+ }}
+
+ topics.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
new file mode 100644
index 0000000..4b72cb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.apache.log4j.Logger
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+case class Unit(
+ format: String,
+ input_topic: String,
+ processor: String,
+ output_topic: String,
+ publish_src: String,
+ timeseries_namespace: String,
+ xr_telemetry: Array[Telemetry],
+ ves_telemetry: Array[Telemetry])
+case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item])
+case class Item(name: String, display_name: String, ts_name: String)
+
+object YamlReader {
+
+ private[this] val logger = Logger.getLogger(getClass().getName());
+
+ val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory())
+ mapper.registerModule(DefaultScalaModule)
+
+ def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = {
+ val patternResolver = new PathMatchingResourcePatternResolver()
+ val mappingLocations = patternResolver.getResources(pattern)
+ val units = mappingLocations.map(loc => {
+ logger.info("Reading metadata " + loc)
+ mapper.readValue(loc.getInputStream, classOf[Unit])
+ });
+ new Metadata(units)
+ }
+
+ def parse(yaml: String): Unit = {
+ val meta: Unit = mapper.readValue(yaml, classOf[Unit])
+ meta
+ }
+
+ def dump(meta: Unit) = {
+ println(" input = " + meta.input_topic)
+ println("output = " + meta.output_topic)
+ meta.xr_telemetry.map(m => {
+ println(" path = " + m.path)
+ println("keys:")
+ m.keys.getOrElse(Array[Item]()).map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ println("content:")
+ m.content.map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
new file mode 100644
index 0000000..4019262
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.meta.Telemetry
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JValue
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Item
+import org.json4s.JsonAST.JObject
+
+
+class VesMapper(config: Telemetry, namespace: String) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = {
+
+ val keys = config.keys.getOrElse(Array[Item]()).map( k => {
+ val value = map.get(k.name).get.toString()
+ k.name -> value
+ }).toMap + ("host" -> host)
+
+ val items = config.content.map( wanted => {
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name)
+ val value = map.get(wanted.name).get.toString()
+
+ new TimeseriesDatapoint(name, value, timestamp, keys)
+ })
+
+ items.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
new file mode 100644
index 0000000..5eaf8f3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.Transformer
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+import org.apache.log4j.Logger
+import org.json4s.jackson.JsonMethods
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.JObject
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JArray
+
+
+class VesTransformer(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val mappers = unit.ves_telemetry.map(d => {
+ d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace)
+ }) //.toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val parsed = JsonMethods.parse(rawEvent.getRawdata)
+
+ if (! parsed.isInstanceOf[JObject]) {
+ Holder.logger.warn("Cannot process parsed JSON")
+ return (false, Set[Payload]())
+ }
+ val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]]
+ val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]]
+ val host = header.get("reportingEntityName").get.toString
+ val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3)
+
+ val generated = mappers.flatMap(r => {
+ val path = r._1
+ val mapper = r._2
+
+ val datapoints = visit(path, value, mapper, host, timestamp)
+ datapoints.map(d => {
+ new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ }).toSet
+ (true, generated)
+ }
+
+ def visit(
+ path: Array[String],
+ map: Map[String, Any],
+ mapper: VesMapper,
+ host: String,
+ timestamp: String): Set[TimeseriesDatapoint] = {
+ if (path.length > 0) {
+ val option = map.get(path.head)
+ option match {
+ case None => {
+ Holder.logger.warn("VES mapper failed to dereference JSON " + path.head)
+ return Set[TimeseriesDatapoint]()
+ }
+
+ case _ => {
+ option.get match {
+ case l: List[_] => {
+ val list = l.asInstanceOf[List[Map[String, Any]]]
+ return list.flatMap(sub => {
+ visit(path.tail, sub, mapper, host, timestamp)
+ }).toSet
+ }
+ case m: Map[_, _] => {
+ val sub = m.asInstanceOf[Map[String, Any]]
+ return visit(path.tail, sub, mapper, host, timestamp)
+ }
+
+ }
+ }
+ }
+ } else {
+ val datapoints = mapper.transform(map, host, timestamp)
+ return datapoints
+ }
+
+ Set[TimeseriesDatapoint]()
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
new file mode 100644
index 0000000..57339f8
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[DStream]]s to Kafka
+ * @param dStream [[DStream]] to be written to Kafka
+ */
+class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[DStream]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ dStream.foreachRDD { rdd =>
+ val rddWriter = new RDDKafkaWriter[T](rdd)
+ rddWriter.writeToKafka(producerConfig, transformFunc, callback)
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
new file mode 100644
index 0000000..2fbf6bc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import java.util.concurrent.{Callable, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+/** Cache of [[KafkaProducer]]s */
+object KafkaProducerCache {
+ private type ProducerConf = Seq[(String, Object)]
+ private type ExProducer = KafkaProducer[_, _]
+
+ private val removalListener = new RemovalListener[ProducerConf, ExProducer]() {
+ override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit =
+ notif.getValue.close()
+ }
+
+ private val cacheExpireTimeout = 10.minutes.toMillis
+ private val cache = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
+ .removalListener(removalListener)
+ .build[ProducerConf, ExProducer]()
+
+ /**
+ * Retrieve a [[KafkaProducer]] in the cache or create a new one
+ * @param producerConfig producer configuration for creating [[KafkaProducer]]
+ * @return a [[KafkaProducer]] already in the cache or a new one
+ */
+ def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] =
+ try {
+ cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] {
+ override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava)
+ }).asInstanceOf[KafkaProducer[K, V]]
+ } catch {
+ case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
+ if e.getCause != null => throw e.getCause
+ }
+
+ /**
+ * Flush and close the [[KafkaProducer]] in the cache associated with this config
+ * @param producerConfig producer configuration associated to a [[KafkaProducer]]
+ */
+ def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig))
+
+ private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1)
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
new file mode 100644
index 0000000..90d2bb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used to write DStreams, RDDs and Datasets to Kafka
+ *
+ * Example usage:
+ * {{{
+ * import com.github.benfradet.spark.kafka.writer.KafkaWriter._
+ * import org.apache.kafka.common.serialization.StringSerializer
+ *
+ * val topic = "my-topic"
+ * val producerConfig = Map(
+ * "bootstrap.servers" -> "127.0.0.1:9092",
+ * "key.serializer" -> classOf[StringSerializer].getName,
+ * "value.serializer" -> classOf[StringSerializer].getName
+ * )
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s)
+ * )
+ *
+ * val rdd: RDD[String] = ...
+ * rdd.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](localTopic, s)
+ * )
+ *
+ * val dataset: Dataset[Foo] = ...
+ * dataset.writeToKafka(
+ * producerConfig,
+ * f => new ProducerRecord[String, String](localTopic, f.toString)
+ * )
+ *
+ * val dataFrame: DataFrame = ...
+ * dataFrame.writeToKafka(
+ * producerConfig,
+ * r => new ProducerRecord[String, String](localTopic, r.get(0).toString)
+ * )
+ * }}}
+ * It is also possible to provide a callback for each write to Kafka.
+ *
+ * This is optional and has a value of None by default.
+ *
+ * Example Usage:
+ * {{{
+ * @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s),
+ * Some(new Callback with Serializable {
+ * override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
+ * if (Option(e).isDefined) {
+ * log.warn("error sending message", e)
+ * } else {
+ * log.info(s"write succeeded, offset: ${metadata.offset()")
+ * }
+ * }
+ * })
+ * )
+ * }}}
+ */
+abstract class KafkaWriter[T: ClassTag] extends Serializable {
+ /**
+ * Write a DStream, RDD, or Dataset to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
new file mode 100644
index 0000000..c6f6133
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[RDD]]s to Kafka
+ * @param rdd [[RDD]] to be written to Kafka
+ */
+class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[RDD]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ rdd.foreachPartition { partition =>
+ val producer = KafkaProducerCache.getProducer[K, V](producerConfig)
+ partition
+ .map(transformFunc)
+ .foreach(record => producer.send(record, callback.orNull))
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
new file mode 100644
index 0000000..04f5ba7
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/** Implicit conversions between
+ * - [[DStream]] -> [[KafkaWriter]]
+ * - [[RDD]] -> [[KafkaWriter]]
+ * - [[Dataset]] -> [[KafkaWriter]]
+ * - [[DataFrame]] -> [[KafkaWriter]]
+ */
+package object writer {
+ /**
+ * Convert a [[DStream]] to a [[KafkaWriter]] implicitly
+ * @param dStream [[DStream]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] =
+ new DStreamKafkaWriter[T](dStream)
+
+ /**
+ * Convert a [[RDD]] to a [[KafkaWriter]] implicitly
+ * @param rdd [[RDD]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] =
+ new RDDKafkaWriter[T](rdd)
+
+}
diff --git a/pnda-ztt-app/src/test/resources/application.properties b/pnda-ztt-app/src/test/resources/application.properties
new file mode 100644
index 0000000..c99a8ad
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/application.properties
@@ -0,0 +1,8 @@
+app.job_name=collectd.tsdb
+kafka.brokers=localhost:9092
+kafka.zookeeper=localhost:2181
+app.checkpoint_path=/tmp
+app.processing_parallelism=1
+app.batch_size_seconds=5
+kafka.consume_from_beginning=false
+opentsdb.ip=localhost:4242
diff --git a/pnda-ztt-app/src/test/resources/log4j.testing.properties b/pnda-ztt-app/src/test/resources/log4j.testing.properties
new file mode 100644
index 0000000..c3b266c
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/log4j.testing.properties
@@ -0,0 +1,14 @@
+log4j.rootCategory=WARN,console
+log4j.logger.org=WARN
+log4j.logger.com.cisco.pnda=DEBUG,console
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,console
+log4j.additivity.com.cisco.ztt=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.immediateFlush=true
+log4j.appender.console.encoding=UTF-8
+
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n
diff --git a/pnda-ztt-app/src/test/resources/meta/test-one.yaml b/pnda-ztt-app/src/test/resources/meta/test-one.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-one.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-three.yaml b/pnda-ztt-app/src/test/resources/meta/test-three.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-three.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/resources/meta/test-two.yaml b/pnda-ztt-app/src/test/resources/meta/test-two.yaml
new file mode 100644
index 0000000..9900043
--- /dev/null
+++ b/pnda-ztt-app/src/test/resources/meta/test-two.yaml
@@ -0,0 +1,14 @@
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala
new file mode 100644
index 0000000..c08ae2f
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.json4s.JsonAST.JArray
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class JsonParserTest extends FlatSpec with Matchers {
+ val cpu_json = """
+{
+ "Source": "172.16.1.157:27059",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "cpu",
+ "encoding_path": "Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization",
+ "collection_id": 265673,
+ "collection_start_time": 1505905434090,
+ "msg_timestamp": 1505905434090,
+ "collection_end_time": 1505905434103
+ },
+ "Rows": [
+ {
+ "Timestamp": 1505905434099,
+ "Keys": {
+ "node-name": "0/RP0/CPU0"
+ },
+ "Content": {
+ "process-cpu_PIPELINE_EDIT": [
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 1,
+ "process-name": "init"
+ },
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 1544,
+ "process-name": "bash"
+ },
+ {
+ "process-cpu-fifteen-minute": 0,
+ "process-cpu-five-minute": 0,
+ "process-cpu-one-minute": 0,
+ "process-id": 26436,
+ "process-name": "sleep"
+ }
+ ],
+ "total-cpu-fifteen-minute": 6,
+ "total-cpu-five-minute": 6,
+ "total-cpu-one-minute": 6
+ }
+ }
+ ]
+}
+ """
+
+ "JsonParser" should "successfully parse cpu telemetry JSON" in {
+ val event = JsonParser.parse(cpu_json)
+
+ event.Telemetry.subscription_id_str should be ("cpu")
+ event.Rows(0).Keys.size should be (1)
+
+ val subrows = event.Rows(0).Content("process-cpu_PIPELINE_EDIT")
+ val extracted = JsonParser.array(subrows)
+
+ extracted.size should be (3)
+ extracted(0).size should be (5)
+ }
+
+ val null_keys = """
+{
+ "Source": "172.16.1.157:49227",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "Logging",
+ "encoding_path": "Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics",
+ "collection_id": 925712,
+ "collection_start_time": 1507552918199,
+ "msg_timestamp": 1507552918199,
+ "collection_end_time": 1507552918203
+ },
+ "Rows": [
+ {
+ "Timestamp": 1507552918201,
+ "Keys": null,
+ "Content": {
+ "buffer-logging-stats": {
+ "buffer-size": 2097152,
+ "is-log-enabled": "true",
+ "message-count": 221,
+ "severity": "message-severity-debug"
+ }
+ }
+ }
+ ]
+}
+ """
+
+ it should "successfully parse JSON with null keys" in {
+ val event = JsonParser.parse(null_keys)
+
+ event.Rows(0).Keys.size should be (0)
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala
new file mode 100644
index 0000000..b133c9e
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.telemetry.xr
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.ztt.cisco.xr.telemetry.TimeseriesMapper
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.cisco.xr.telemetry.JsonParser
+
+class TelemetryMapperTest extends FlatSpec with Matchers {
+
+ val ipv6_yaml = """
+input_topic: telemetry.avro
+output_topic: timeseries
+timeseries_namespace: ipv6
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic
+ keys:
+ - name: node-name
+ display_name: "Node Name"
+ content:
+ - name: ipv6.total-packets
+ display_name: "Total IPV6 Packets"
+ ts_name: total-ipv6-packets
+
+ - name: icmp.total-messages
+ display_name: "Total ICMP Messages"
+"""
+
+ val ipv6_json = """
+{
+ "Source": "172.16.1.157:56197",
+ "Telemetry": {
+ "node_id_str": "IOS-XRv9k-edge-1",
+ "subscription_id_str": "ipv6-io",
+ "encoding_path": "Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic",
+ "collection_id": 282962,
+ "collection_start_time": 1506008887021,
+ "msg_timestamp": 1506008887021,
+ "collection_end_time": 1506008887039
+ },
+ "Rows": [
+ {
+ "Timestamp": 1506008887031,
+ "Keys": {
+ "node-name": "0/RP0/CPU0"
+ },
+ "Content": {
+ "icmp": {
+ "output-messages": 0,
+ "total-messages": 4,
+ "unknown-error-type-messages": 0
+ },
+ "ipv6": {
+ "bad-header-packets": 0,
+ "total-packets": 12,
+ "unknown-protocol-packets": 0
+ },
+ "ipv6-node-discovery": {
+ "received-redirect-messages": 0,
+ "sent-redirect-messages": 0
+ }
+ }
+ }
+ ]
+}
+"""
+
+ "TelemetryMapper" should "Map to wanted timeseries values" in {
+ val meta = YamlReader.parse(ipv6_yaml)
+ val mapper = new TimeseriesMapper(meta.xr_telemetry(0), "demo")
+
+ val event = JsonParser.parse(ipv6_json)
+ val timeseries = mapper.transform(event).toList
+
+ timeseries.length should be (2)
+ timeseries(0).metric should be ("demo.icmp.total-messages")
+ timeseries(1).metric should be ("demo.total-ipv6-packets")
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala
new file mode 100644
index 0000000..2ee1749
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.Matchers
+import org.scalatest.FlatSpec
+import com.cisco.ztt.TimeseriesDatapoint
+import java.io.StringWriter
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+class DatapointTest extends FlatSpec with Matchers {
+
+ "TimeseriesDatapoint" should "serialize to json" in {
+ val data = Array(
+ new TimeseriesDatapoint("packets-in", "5", "1000000",
+ Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1")),
+ new TimeseriesDatapoint("packets-out", "5", "1000000",
+ Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1"))
+ )
+
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, data)
+ val json = out.toString()
+ }
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala
new file mode 100644
index 0000000..eabe54e
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+class YamlReaderTest extends FlatSpec with Matchers {
+
+ val interface_yaml = """
+input_topic: telemetry
+output_topic: timeseries
+
+xr_telemetry:
+ - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters
+ keys:
+ - name: interface-name
+ display_name: "Interface Name"
+
+ content:
+ - name: bytes-received
+ display_name: "Bytes Received"
+
+ - name: bytes-sent
+ display_name: "Bytes Sent"
+
+ - name: packets-received
+ display_name: "Packets Received"
+
+ - name: packets-sent
+ display_name: "Packets Sent"
+"""
+
+ "YamlReader" should "successfully parse YAML text" in {
+ val meta = YamlReader.parse(interface_yaml)
+
+ meta.input_topic should be ("telemetry")
+ meta.output_topic should be ("timeseries")
+ meta.xr_telemetry.length should be (1)
+ meta.xr_telemetry(0).keys.get.length should be(1)
+ meta.xr_telemetry(0).content.length should be(4)
+ }
+
+ it should "read multiple files from classpath" in {
+ val meta = YamlReader.load("classpath*:meta/test-*.yaml")
+ meta.units.length should be (3)
+ }
+}
diff --git a/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala
new file mode 100644
index 0000000..58f87fe
--- /dev/null
+++ b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ves.telemetry
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.YamlReader
+import com.cisco.ztt.ves.telemetry.VesTransformer
+import org.apache.log4j.BasicConfigurator
+
+class VesTransformerTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+ before {
+ BasicConfigurator.configure();
+ }
+
+ val yaml = """
+format: ves
+input_topic: ves.avro
+processor: timeseries
+output_topic: timeseries
+timeseries_namespace: nic
+
+ves_telemetry:
+ - path: measurementsForVfScalingFields/vNicUsageArray
+ keys:
+ - name: vNicIdentifier
+ content:
+ - name: receivedTotalPacketsDelta
+"""
+ val payload = """{
+ "event": {
+ "commonEventHeader": {
+ "startEpochMicrosec": 1537258053361276,
+ "sourceId": "bab0c4de-cfb8-4a0d-b7e4-a3d4020bb667",
+ "eventId": "TrafficStats_1.2.3.4",
+ "nfcNamingCode": "vVNF",
+ "reportingEntityId": "No UUID available",
+ "internalHeaderFields": {
+ "collectorTimeStamp": "Tue, 09 18 2018 08:01:41 GMT"
+ },
+ "eventType": "HTTP request rate",
+ "priority": "Normal",
+ "version": 3,
+ "reportingEntityName": "fwll",
+ "sequence": 6108,
+ "domain": "measurementsForVfScaling",
+ "lastEpochMicrosec": 1537258063557408,
+ "eventName": "vFirewallBroadcastPackets",
+ "sourceName": "vFW_SINK_VNF2",
+ "nfNamingCode": "vVNF"
+ },
+ "measurementsForVfScalingFields": {
+ "cpuUsageArray": [
+ {
+ "percentUsage": 0,
+ "cpuIdentifier": "cpu1",
+ "cpuIdle": 100,
+ "cpuUsageSystem": 0,
+ "cpuUsageUser": 0
+ }
+ ],
+ "measurementInterval": 10,
+ "requestRate": 9959,
+ "vNicUsageArray": [
+ {
+ "transmittedOctetsDelta": 0,
+ "receivedTotalPacketsDelta": 0,
+ "vNicIdentifier": "eth0",
+ "valuesAreSuspect": "true",
+ "transmittedTotalPacketsDelta": 0,
+ "receivedOctetsDelta": 0
+ }
+ ],
+ "measurementsForVfScalingVersion": 2.1
+ }
+ }
+}
+"""
+ "VesTransformer" should "successfully transform a VES event" in {
+ val unit = YamlReader.parse(yaml)
+ val ves = new VesTransformer(unit)
+
+ val event = new DataPlatformEvent("src", System.currentTimeMillis(), "127.0.0.1", payload)
+ val (status, result) = ves.transform(event)
+
+ status should be(true)
+ result.toList.length should be (1)
+ result.toList(0).datapoint.metric should be ("nic.receivedTotalPacketsDelta")
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore
new file mode 100644
index 0000000..d392f0e
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore
@@ -0,0 +1 @@
+*.jar
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties
new file mode 100644
index 0000000..8c1b965
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties
@@ -0,0 +1,8 @@
+kafka.brokers=${environment_kafka_brokers}
+kafka.zookeeper=${environment_kafka_zookeeper}
+app.checkpoint_path=${component_checkpoint_path}
+app.job_name=${component_job_name}
+app.processing_parallelism=${component_processing_parallelism}
+app.batch_size_seconds=${component_batch_size_seconds}
+kafka.consume_from_beginning=${component_consume_from_beginning}
+opentsdb.ip=${environment_opentsdb} \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties
new file mode 100644
index 0000000..6a3cab1
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=ERROR,rolling
+log4j.logger.com.cisco.pnda=${component.log_level},rolling
+log4j.additivity.com.cisco.pnda=false
+log4j.logger.com.cisco.ztt=DEBUG,rolling
+log4j.additivity.com.cisco.ztt=false
+
+log4j.appender.rolling=org.apache.log4j.RollingFileAppender
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
+log4j.appender.rolling.maxFileSize=50MB
+log4j.appender.rolling.maxBackupIndex=1
+log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/spark.log
+log4j.appender.rolling.encoding=UTF-8
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json
new file mode 100644
index 0000000..1029987
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json
@@ -0,0 +1,8 @@
+[
+ {
+ "name": "nic.receivedTotalPacketsDelta"
+ },
+ {
+ "name": "cpu.percentUsage"
+ }
+] \ No newline at end of file
diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
new file mode 100644
index 0000000..2f8ab6a
--- /dev/null
+++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json
@@ -0,0 +1,10 @@
+{
+ "main_jar": "PndaZTTApp.jar",
+ "main_class": "com.cisco.ztt.App",
+ "log_level": "INFO",
+ "batch_size_seconds" : "2",
+ "processing_parallelism" : "1",
+ "checkpoint_path": "",
+ "input_topic": "ves.avro",
+ "consume_from_beginning": "false"
+}