diff options
Diffstat (limited to 'pnda-ztt-app/src')
61 files changed, 2763 insertions, 0 deletions
diff --git a/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc new file mode 100644 index 0000000..5450771 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/dataplatform-raw.avsc @@ -0,0 +1,10 @@ +{"namespace": "com.cisco.pnda", + "type": "record", + "name": "PndaRecord", + "fields": [ + {"name": "timestamp", "type": "long"}, + {"name": "src", "type": "string"}, + {"name": "host_ip", "type": "string"}, + {"name": "rawdata", "type": "bytes"} + ] +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml new file mode 100644 index 0000000..5d56f77 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/bgp-neighbor-af-table.yaml @@ -0,0 +1,17 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: bgp + +xr_telemetry: + - path: Cisco-IOS-XR-ipv4-bgp-oper:bgp/instances/instance/instance-active/default-vrf/afs/af/neighbor-af-table/neighbor + keys: + - name: af-name + display_name: "Address Family Name" + - name: instance-name + display_name: "Instance Name" + - name: neighbor-address + display_name: "Neighbor Address" + content: + - name: connection-up-count + display_name: "Connection Up Count" diff --git a/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml new file mode 100644 index 0000000..0ce8870 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/cpu-utilization.yaml @@ -0,0 +1,19 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: cpu + +xr_telemetry: + - path: Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization + keys: + - name: node-name + display_name: "Node Name" + content: + - name: total-cpu-one-minute + display_name: "One-minute CPU Total" + + - name: total-cpu-five-minute + display_name: "Five-minute CPU Total" + + - name: total-cpu-fifteen-minute + display_name: "Fifteen-minute CPU Total" diff --git a/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml new file mode 100644 index 0000000..2dbb4ae --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/fib-summary.yaml @@ -0,0 +1,27 @@ +format: cisco.xr.telemetry +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: fib + +xr_telemetry: + - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/fib-summaries/fib-summary + keys: + - name: node-name + display_name: "Node Name" + - name: protocol-name + display_name: "Protocol Name" + - name: vrf-name + display_name: "VRF Name" + content: + - name: extended-prefixes + display_name: "Num Extended Prefixes" + + - name: forwarding-elements + display_name: "Num Forwarding Elements" + + - name: next-hops + display_name: "Num Next Hops" + + - name: routes + display_name: "Num Routes" diff --git a/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml new file mode 100644 index 0000000..24fb808 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/interfaces-generic-counters.yaml @@ -0,0 +1,36 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: interface + +xr_telemetry: + - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters + keys: + - name: interface-name + display_name: "Interface Name" + content: + - name: bytes-received + display_name: "Bytes Received" + ts_name: bytes-in # rename the metric in OpenTSDB + + - name: bytes-sent + display_name: "Bytes Sent" + ts_name: bytes-out # rename the metric in OpenTSDB + + - name: packets-received + display_name: "Packets Received" + + - name: packets-sent + display_name: "Packets Sent" + + - name: broadcast-packets-received + display_name: "Broadcast Packets Received" + + - name: broadcast-packets-sent + display_name: "Broadcast Packets Sent" + + - name: multicast-packets-received + display_name: "Multicast Packets Received" + + - name: multicast-packets-sent + display_name: "Multicast Packets Sent" diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml new file mode 100644 index 0000000..3bc9689 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/inventory-entity.yaml @@ -0,0 +1,27 @@ +input_topic: telemetry.avro +processor: inventory +output_topic: inventory + +xr_telemetry: + - path: Cisco-IOS-XR-invmgr-oper:inventory/entities/entity/attributes/inv-basic-bag + keys: + - name: name + display_name: "Entity Name" + content: + - name: serial-number + display_name: "Serial Number" + + - name: description + display_name: "Description" + + - name: manufacturer-name + display_name: "Manufacturer" + + - name: model-name + display_name: "Model Name" + + - name: software-revision + display_name: "Software Revision" + + - name: vendor-type + display_name: "Vendor OID" diff --git a/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml new file mode 100644 index 0000000..db6386d --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/inventory-rack.yaml @@ -0,0 +1,27 @@ +input_topic: telemetry.avro +processor: inventory +output_topic: inventory + +xr_telemetry: + - path: Cisco-IOS-XR-invmgr-oper:inventory/racks/rack/attributes/inv-basic-bag + keys: + - name: name + display_name: "Entity Name" + content: + - name: serial-number + display_name: "Serial Number" + + - name: description + display_name: "Description" + + - name: manufacturer-name + display_name: "Manufacturer" + + - name: model-name + display_name: "Model Name" + + - name: software-revision + display_name: "Software Revision" + + - name: vendor-type + display_name: "Vendor OID" diff --git a/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml new file mode 100644 index 0000000..3a5dcba --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/ipv6-traffic.yaml @@ -0,0 +1,16 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: ipv6 + +xr_telemetry: + - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic + keys: + - name: node-name + display_name: "Node Name" + content: + - name: ipv6.total-packets + display_name: "Total IPV6 Packets" + + - name: icmp.total-messages + display_name: "Total ICMP Messages" diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml new file mode 100644 index 0000000..2360de6 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/lldp-neighbor-summary.yaml @@ -0,0 +1,58 @@ +format: cisco.xr.telemetry +input_topic: telemetry.avro +processor: inventory +output_topic: inventory + +xr_telemetry: + - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/summaries/summary + keys: + - name: device-id + display_name: "Device Id" + - name: interface-name + display_name: "Interface Name" + - name: node-name + display_name: "Node Name" + content: + - name: chassis-id + display_name: "Chassis Id" + + - name: device-id + display_name: "Device Id" + + - name: port-id-detail + display_name: "Port Id" + + - name: receiving-interface-name + display_name: "Receiving Interface Name" + + - name: enabled-capabilities + display_name: "Enabled Capabilities" + - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device + keys: + - name: device-id + display_name: "Device Id" + - name: interface-name + display_name: "Interface Name" + - name: node-name + display_name: "Node Name" + content: + - name: lldp-neighbor.chassis-id + display_name: "Chassis Id" + - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/details/detail + keys: + - name: device-id + display_name: "Device Id" + - name: interface-name + display_name: "Interface Name" + - name: node-name + display_name: "Node Name" + content: + - name: lldp-neighbor.chassis-id + display_name: "Chassis Id" + - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/interfaces/interface + keys: + - name: interface-name + - name: node-name + content: + - name: interface-name + - name: if-index diff --git a/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml new file mode 100644 index 0000000..e29e32c --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/lldp-stats.yaml @@ -0,0 +1,23 @@ +format: cisco.xr.telemetry +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: lldp.stats + +xr_telemetry: + - path: Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/statistics + keys: + - name: node-name + display_name: "Node Name" + content: + - name: received-packets + - name: transmitted-packets + - name: aged-out-entries + - name: bad-packets + - name: discarded-packets + - name: discarded-tl-vs + - name: encapsulation-errors + - name: out-of-memory-errors + - name: queue-overflow-errors + - name: table-overflow-errors + - name: unrecognized-tl-vs diff --git a/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml new file mode 100644 index 0000000..5d12152 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/logging-stats.yaml @@ -0,0 +1,11 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: logging + +xr_telemetry: + - path: Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics + content: + - name: buffer-logging-stats.message-count + ts_name: message-count + display_name: "Serial Number" diff --git a/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml new file mode 100644 index 0000000..6b3f657 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/memory-detail.yaml @@ -0,0 +1,31 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: memory + +xr_telemetry: + - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/detail + keys: + - name: node-name + display_name: "Node Name" + content: + - name: allocated-memory + display_name: "Allocated Memory" + + - name: free-application-memory + display_name: "Free Application Memory" + + - name: free-physical-memory + display_name: "Free Physical Memory" + + - name: ram-memory + display_name: "RAM Memory" + + - name: program-data + display_name: "Program Data" + + - name: program-stack + display_name: "Program Stack" + + - name: program-text + display_name: "Program Text" diff --git a/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml new file mode 100644 index 0000000..02adef6 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/memory-summary.yaml @@ -0,0 +1,25 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: memory.summary + +xr_telemetry: + - path: Cisco-IOS-XR-nto-misc-oper:memory-summary/nodes/node/summary + keys: + - name: node-name + display_name: "Node Name" + content: + - name: allocated-memory + display_name: "Allocated Memory" + + - name: free-application-memory + display_name: "Free Application Memory" + + - name: free-physical-memory + display_name: "Free Physical Memory" + + - name: ram-memory + display_name: "RAM Memory" + + - name: system-ram-memory + display_name: "System RAM Memopry" diff --git a/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml new file mode 100644 index 0000000..197b0b9 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/rib-oper.yaml @@ -0,0 +1,27 @@ +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: rib + +xr_telemetry: + - path: Cisco-IOS-XR-ip-rib-ipv4-oper:rib/vrfs/vrf/afs/af/safs/saf/ip-rib-route-table-names/ip-rib-route-table-name/protocol/bgp/as/information + keys: + - name: af-name + display_name: "Address Family Name" + - name: as + display_name: "Address Family" + - name: route-table-name + display_name: "Route table name" + - name: saf-name + display_name: "Saf name" + - name: vrf-name + display_name: "Vrf name" + content: + - name: active-routes-count + display_name: "Active Routes Count" + - name: instance + display_name: "Instance" + - name: paths-count + display_name: "Paths Count" + - name: routes-counts + display_name: "Routes Count" diff --git a/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml new file mode 100644 index 0000000..aaf1de8 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/ves-cpu.yaml @@ -0,0 +1,12 @@ +format: ves +input_topic: ves.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: cpu + +ves_telemetry: + - path: measurementsForVfScalingFields/cpuUsageArray + keys: + - name: cpuIdentifier + content: + - name: percentUsage diff --git a/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml new file mode 100644 index 0000000..a6ae3de --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/ves-nic.yaml @@ -0,0 +1,12 @@ +format: ves +input_topic: ves.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: nic + +ves_telemetry: + - path: measurementsForVfScalingFields/vNicUsageArray + keys: + - name: vNicIdentifier + content: + - name: receivedTotalPacketsDelta diff --git a/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml new file mode 100644 index 0000000..df466c3 --- /dev/null +++ b/pnda-ztt-app/src/main/resources/meta/vrf-summary.yaml @@ -0,0 +1,27 @@ +format: cisco.xr.telemetry +input_topic: telemetry.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: fib.vrf + +xr_telemetry: + - path: Cisco-IOS-XR-fib-common-oper:fib/nodes/node/protocols/protocol/vrfs/vrf/summary + keys: + - name: node-name + display_name: "Node Name" + - name: protocol-name + display_name: "Protocol Name" + - name: vrf-name + display_name: "VRF Name" + content: + - name: extended-prefixes + display_name: "Num Extended Prefixes" + + - name: forwarding-elements + display_name: "Num Forwarding Elements" + + - name: next-hops + display_name: "Num Next Hops" + + - name: routes + display_name: "Num Routes" diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala new file mode 100644 index 0000000..b143b62 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: StatReporter + * Purpose: Report batch processing metrics to the PNDA metric logger + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.pnda + +import scala.util.control.NonFatal +import java.io.StringWriter +import java.io.PrintWriter +import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import org.apache.log4j.Logger +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.DefaultHttpClient + +class StatReporter(appName: String, metricsUrl: String) extends StreamingListener { + + private[this] val logger = Logger.getLogger(getClass().getName()) + + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = { + def doSend(metricName: String, metricValue: String) = { + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(metricsUrl) + post.setHeader("Content-type", "application/json") + val ts = java.lang.System.currentTimeMillis() + val body = f"""{ + | "data": [{ + | "source": "application.$appName", + | "metric": "application.kpi.$appName.$metricName", + | "value": "$metricValue", + | "timestamp": $ts%d + | }], + | "timestamp": $ts%d + |}""".stripMargin + + logger.debug(body) + post.setEntity(new StringEntity(body)) + val response = httpClient.execute(post) + if (response.getStatusLine.getStatusCode() != 200) { + logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode()) + } + + } catch { + case NonFatal(t) => { + logger.error("POST failed: " + metricsUrl) + val sw = new StringWriter + t.printStackTrace(new PrintWriter(sw)) + logger.error(sw.toString) + } + } + } + doSend("processing-delay", batch.batchInfo.processingDelay.get.toString()) + doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString()) + doSend("num-records", batch.batchInfo.numRecords.toString()) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java new file mode 100644 index 0000000..1a08bd4 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: DataPlatformEvent + * Purpose: Data model class for an avro event on Kafka + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.IOException; +import java.io.Serializable; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.ObjectMapper; + +public class DataPlatformEvent implements Serializable +{ + private static final long serialVersionUID = 1L; + protected static ObjectMapper _mapper = new ObjectMapper(); + + private String _src; + private Long _timestamp; + private String _hostIp; + private String _rawdata; + + public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata) + { + _src = src; + _timestamp = timestamp; + _hostIp = host_ip; + _rawdata = rawdata; + } + + public String getSrc() + { + return _src; + } + + public Long getTimestamp() + { + return _timestamp; + } + + public String getHostIp() + { + return _hostIp; + } + + public String getRawdata() + { + return _rawdata; + } + + @Override + public String toString() + { + try + { + return _mapper.writeValueAsString(this); + } + catch (Exception ex) + { + return null; + } + } + + @Override + public boolean equals(Object other) + { + boolean result = false; + if (other instanceof DataPlatformEvent) + { + DataPlatformEvent that = (DataPlatformEvent) other; + result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc()))) + && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp()))) + && (this.getHostIp() == that.getHostIp() || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp()))) + && (this.getRawdata() == that.getRawdata() || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata()))); + } + return result; + + } + + public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException + { + return _mapper.readTree(_rawdata); + } + +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java new file mode 100644 index 0000000..ef2bc0d --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: DataPlatformEventDecoder + * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.log4j.Logger; + + +public class DataPlatformEventCodec +{ + private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName()); + + private Schema.Parser _parser = new Schema.Parser(); + private DatumReader<GenericRecord> _reader; + private DatumWriter<GenericRecord> _writer; + private Schema _schema; + private String _schemaDef; + + public DataPlatformEventCodec(String schemaDef) throws IOException + { + _schemaDef = schemaDef; + _schema = _parser.parse(schemaDef); + _reader = new GenericDatumReader<GenericRecord>(_schema); + _writer = new GenericDatumWriter<GenericRecord>(_schema); + } + + public DataPlatformEvent decode(byte[] data) throws IOException + { + try + { + Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); + GenericRecord r = _reader.read(null, decoder); + return new DataPlatformEvent((String) r.get("src").toString(), + (Long) r.get("timestamp"), + (String) r.get("host_ip").toString(), + new String(((ByteBuffer)r.get("rawdata")).array())); + } + catch(Exception ex) + { + LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex); + throw new IOException(ex); + } + } + + final protected static char[] hexArray = "0123456789ABCDEF".toCharArray(); + public static String hexStr(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for ( int j = 0; j < bytes.length; j++ ) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + public byte[] encode(DataPlatformEvent e) throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericRecord datum = new GenericData.Record(_schema); + datum.put("src", e.getSrc()); + datum.put("timestamp", e.getTimestamp()); + datum.put("host_ip", e.getHostIp()); + datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8"))); + _writer.write(datum, encoder); + encoder.flush(); + out.close(); + return out.toByteArray(); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java new file mode 100644 index 0000000..24a9d35 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: StaticHelpers + * Purpose: Helper functions + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +public class StaticHelpers { + public static String loadResourceFile(String path) + { + InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path); + try + { + char[] buf = new char[2048]; + Reader r = new InputStreamReader(is, "UTF-8"); + StringBuilder s = new StringBuilder(); + while (true) + { + int n = r.read(buf); + if (n < 0) + break; + s.append(buf, 0, n); + } + return s.toString(); + } + catch (Exception e) + { + return null; + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala new file mode 100644 index 0000000..6bc2083 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +import org.apache.spark.streaming.StreamingContext +import com.cisco.pnda.StatReporter +import org.apache.log4j.Logger +import com.cisco.ztt.meta.YamlReader +import org.apache.log4j.BasicConfigurator + +object App { + + private[this] val logger = Logger.getLogger(getClass().getName()) + + def main(args: Array[String]) { + + BasicConfigurator.configure(); + + val props = AppConfig.loadProperties(); + val loggerUrl = props.getProperty("environment.metric_logger_url") + val appName = props.getProperty("component.application") + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val metadata = YamlReader.load() + if (metadata.units.length == 0) { + logger.error("Trying to run app without metadata") + System.exit(1) + } + val pipeline = new ZttPipeline(metadata) + + // Create the streaming context, or load a saved one from disk + val ssc = if (checkpointDirectory.length() > 0) + StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create(); + + sys.ShutdownHookThread { + logger.info("Gracefully stopping Spark Streaming Application") + ssc.stop(true, true) + logger.info("Application stopped") + } + + if (loggerUrl != null) { + logger.info("Reporting stats to url: " + loggerUrl) + ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) + } + logger.info("Starting spark streaming execution") + ssc.start() + ssc.awaitTermination() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java new file mode 100644 index 0000000..a711cab --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: AppConfig + * Purpose: Load application properties file. + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.ztt; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.log4j.Logger; + +public class AppConfig +{ + private static final Logger LOGGER = Logger.getLogger(AppConfig.class); + + private static Properties _properties; + private static Object _lock = new Object(); + + public static Properties loadProperties() + { + synchronized (_lock) + { + if (_properties == null) + { + _properties = new Properties(); + try + { + InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties"); + _properties.load(is); + is.close(); + LOGGER.info("Properties loaded"); + } + catch (IOException e) + { + LOGGER.info("Failed to load properties", e); + System.exit(1); + } + } + return _properties; + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala new file mode 100644 index 0000000..c0bc61e --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: KafkaInput + * Purpose: Generate a dstream from Kafka + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.kafka.KafkaUtils + +import com.cisco.pnda.model.DataPlatformEventCodec +import com.cisco.pnda.model.StaticHelpers + +import kafka.serializer.DefaultDecoder +import kafka.serializer.StringDecoder +import org.apache.log4j.Logger + +class KafkaInput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def readFromKafka(ssc: StreamingContext, topic: String) = { + val props = AppConfig.loadProperties(); + val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + if (props.getProperty("kafka.consume_from_beginning").toBoolean) { + kafkaParams.put("auto.offset.reset", "smallest"); + } + + Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list")) + Holder.logger.info("Registering with kafka using topic " + topic) + + val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( + ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + + // Decode avro container format + val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); + val rawMessages = messages.map(x => { + val eventDecoder = new DataPlatformEventCodec(avroSchemaString); + val payload = x._2; + val dataPlatformEvent = eventDecoder.decode(payload); + dataPlatformEvent; + }); + rawMessages; + }; +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala new file mode 100644 index 0000000..1041d00 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + + + + + +import java.io.StringWriter + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream + +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.pnda.model.DataPlatformEventCodec +import com.cisco.pnda.model.StaticHelpers +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter + + +class KafkaOutput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def writeToKafka(output: DStream[Payload]) = { + + val props = AppConfig.loadProperties(); + val producerConfig = Map( + "bootstrap.servers" -> props.getProperty("kafka.brokers"), + "key.serializer" -> classOf[StringSerializer].getName, + "value.serializer" -> classOf[ByteArraySerializer].getName +) + output.writeToKafka( + producerConfig, + s => { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + val out = new StringWriter + mapper.writeValue(out, s.datapoint) + val json = out.toString() + + val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json) + val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); + val codec = new DataPlatformEventCodec(avroSchemaString); + + new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event)) + } + ) + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala new file mode 100644 index 0000000..9077986 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: OpenTSDBOutput + * Purpose: Write a dstream to OpenTSDB + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import java.io.StringWriter + +import scala.Iterator + +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.util.EntityUtils +import org.apache.http.impl.client.DefaultHttpClient +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import scala.collection.mutable.ArrayBuffer + +class OpenTSDBOutput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def putOpentsdb[T](opentsdbIP: String, + input: DStream[Payload]): DStream[Payload] = { + input.mapPartitions(partition => { + var size = 0 + val output = partition.grouped(20).flatMap(group => { + val data = ArrayBuffer[TimeseriesDatapoint]() + val passthru = group.map(item => { + data += item.datapoint + item + }) + + size += data.length + + if (data.length > 0) { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + val out = new StringWriter + mapper.writeValue(out, data) + val json = out.toString() + + Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB") + Holder.logger.debug(json) + + if (opentsdbIP != null && opentsdbIP.length() > 0) { + val openTSDBUrl = "http://" + opentsdbIP + "/api/put" + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(openTSDBUrl) + post.setHeader("Content-type", "application/json") + post.setEntity(new StringEntity(json)) + val response = httpClient.execute(post) + // Holder.logger.debug(EntityUtils.toString(response.getEntity())) + } catch { + case t: Throwable => { + Holder.logger.warn(t) + } + } + } + } else { + Holder.logger.debug("No datapoints to post to OpenTSDB") + } + passthru + }) + output + }); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala new file mode 100644 index 0000000..2961bd0 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +class Payload( + val publishSrc: String, + val publishTopic: String, + val hostIp: String, + val timestamp: Long, + val datapoint: TimeseriesDatapoint) { + +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala new file mode 100644 index 0000000..7e447bf --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +class TimeseriesDatapoint( + val metric: String, + val value: String, + val timestamp: String, + val tags: Map[String, String]) { + +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala new file mode 100644 index 0000000..aae1e8c --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + + +import com.cisco.ztt.meta.Metadata +import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher +import com.cisco.ztt.ves.telemetry.VesTransformer + +class TransformManager(metadata: Metadata) { + + val transformers: Array[Transformer] = metadata.units.map( unit => { + + unit.format match { + case "ves" => new VesTransformer(unit) + case "cisco.xr.telemetry" => new TelemetryDispatcher(unit) + case _ => new TelemetryDispatcher(unit) + } + }) + + val byTopic = transformers.groupBy( t => { t.inputTopic }) +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala new file mode 100644 index 0000000..861eb5e --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +import com.cisco.pnda.model.DataPlatformEvent +import org.apache.spark.streaming.dstream.DStream + +trait Transformer { + def inputTopic: String + def transform(event: DataPlatformEvent): (Boolean, Set[Payload]) +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala new file mode 100644 index 0000000..c3b3532 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: KafkaPipeline + * Purpose: Set up the spark streaming processing graph. + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import org.apache.log4j.Logger +import org.apache.spark.SparkConf +import org.apache.spark.streaming.Seconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream +import com.cisco.ztt.meta.Metadata +import com.cisco.pnda.model.DataPlatformEvent + +class ZttPipeline(metadata: Metadata) extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def create() = { + val props = AppConfig.loadProperties(); + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val sparkConf = new SparkConf(); + Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory) + val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds)); + + if (checkpointDirectory.length() > 0) { + ssc.checkpoint(checkpointDirectory) + } + + val transformManager = new TransformManager(metadata) + val streams = transformManager.byTopic.map( x => { + val topic = x._1 + val transformers = x._2 + + val inputStream = new KafkaInput().readFromKafka(ssc, topic) + + val timeseriesStream = inputStream.flatMap(dataPlatformEvent => { + var handled = false; + val datapoints = transformers.flatMap(transformer => { + val (ran, data) = transformer.transform(dataPlatformEvent) + handled |= ran; + data; + }) + if (!handled) { + Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata) + } + datapoints + }) + + val outputStream = + new OpenTSDBOutput().putOpentsdb( + props.getProperty("opentsdb.ip"), + timeseriesStream); + + new KafkaOutput().writeToKafka(outputStream) + }); + + ssc; + }: StreamingContext +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala new file mode 100644 index 0000000..00470ff --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint +import org.apache.log4j.Logger +import com.cisco.ztt.meta.Telemetry + +class InventoryMapper(config: Telemetry) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + Holder.logger.debug("InventoryMapper is sinking " + config.path) + Set[TimeseriesDatapoint]() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala new file mode 100644 index 0000000..97f4da3 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JArray +import org.json4s.JsonAST.JInt +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods +import org.json4s.jvalue2extractable +import org.json4s.string2JsonInput +import scala.reflect.ManifestFactory.arrayType +import scala.reflect.ManifestFactory.classType +import org.json4s.JsonAST.JObject + + +case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue]) +case class THeader(node_id_str: String, subscription_id_str: String, + encoding_path: String, collection_id: Int, collection_start_time: Int, + msg_timestamp: Int, collection_end_time: Int) +case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow]) + + +object JsonParser { + + def parse(json: String): TEvent = { + implicit val formats = DefaultFormats + + val parsed = JsonMethods.parse(json) + val event = parsed.extract[TEvent] + event + } + + def array(value: JValue): Array[Map[String,JValue]] = { + implicit val formats = DefaultFormats + val array = value.asInstanceOf[JArray] + array.extract[Array[Map[String, JValue]]] + } + + def map(value: JValue): Map[String,JValue] = { + implicit val formats = DefaultFormats + val map = value.asInstanceOf[JObject] + map.extract[Map[String, JValue]] + } + + def int(value: JValue): String = { + value.asInstanceOf[JInt].num.toString + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala new file mode 100644 index 0000000..2983e88 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint + +trait Mapper { + def transform(event: TEvent): Set[TimeseriesDatapoint] +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala new file mode 100644 index 0000000..9f9c775 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint +import org.apache.log4j.Logger + + +class NullMapper(path: String) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + Holder.logger.debug("NullMapper is sinking " + path) + Set[TimeseriesDatapoint]() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala new file mode 100644 index 0000000..b7deadc --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.apache.log4j.Logger + +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.ztt.TimeseriesDatapoint +import com.cisco.ztt.Transformer +import com.cisco.ztt.meta.Unit +import com.cisco.ztt.Payload + + +class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def inputTopic: String = { unit.input_topic } + + val transformers = unit.xr_telemetry.map( d => { + Holder.logger.warn("Choosing mapper for " + unit.processor) + unit.processor match { + case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace) + case "inventory" => d.path -> new InventoryMapper(d) + case _ => d.path -> new NullMapper(d.path) + } + }).toMap + + def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = { + Holder.logger.trace(rawEvent.getRawdata()) + + try { + val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src } + val event = JsonParser.parse(rawEvent.getRawdata()) + val path = event.Telemetry.encoding_path + if (transformers.contains(path)) { + Holder.logger.debug("Transforming for " + path) + val datapoints = transformers(path).transform(event) + val payloads = datapoints.map(d => { + new Payload(source, unit.output_topic, + rawEvent.getHostIp, rawEvent.getTimestamp, d) + }) + (true, payloads) + } else { + Holder.logger.trace("No transformer in unit for " + path) + (false, Set[Payload]()) + } + } catch { + case t: Throwable => { + Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage) + (false, Set[Payload]()) + } + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala new file mode 100644 index 0000000..6c9ad80 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.apache.log4j.Logger +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JObject +import org.json4s.JsonAST.JValue +import com.cisco.ztt.meta.Item +import com.cisco.ztt.meta.Telemetry + +class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => { + item.name -> item + }).toMap + + val wantedValues = config.content.map( item => { + item.name -> item + }).toMap + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + + val timeseries = event.Rows.flatMap( row => { + val keys = row.Keys + .getOrElse(Map[String, String]()) + .filter(k => wantedKeys.contains(k._1)) + .map( k => { + val wanted = wantedKeys(k._1) + val name = if (wanted.ts_name != null) wanted.ts_name else k._1 + name -> k._2 + }).toMap + ("host" -> event.Telemetry.node_id_str) + + // Flatten nested maps into container.key -> value + val expanded = row.Content.flatMap( v => { + if (v._2.isInstanceOf[JObject]) { + JsonParser.map(v._2).map( kv => { + (v._1 + "." + kv._1) -> kv._2 + }) + } else { + Map[String, JValue](v) + } + }) + + val items = expanded.filter(v => wantedValues.contains(v._1)) + .map( data => { + val wanted = wantedValues(data._1) + val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1) + val value = JsonParser.int(data._2) + val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys) + datapoint + }) + items.toSet + }) + + timeseries.toSet + } + +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala new file mode 100644 index 0000000..84f5bbb --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +class Metadata(val units: Array[Unit]) extends Serializable { + def topics() : Set[String] = { + val topics = units.map { case (unit : Unit) => { + unit.input_topic + }} + + topics.toSet + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala new file mode 100644 index 0000000..4b72cb1 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +import org.apache.log4j.Logger +import org.springframework.core.io.support.PathMatchingResourcePatternResolver + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +case class Unit( + format: String, + input_topic: String, + processor: String, + output_topic: String, + publish_src: String, + timeseries_namespace: String, + xr_telemetry: Array[Telemetry], + ves_telemetry: Array[Telemetry]) +case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item]) +case class Item(name: String, display_name: String, ts_name: String) + +object YamlReader { + + private[this] val logger = Logger.getLogger(getClass().getName()); + + val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory()) + mapper.registerModule(DefaultScalaModule) + + def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = { + val patternResolver = new PathMatchingResourcePatternResolver() + val mappingLocations = patternResolver.getResources(pattern) + val units = mappingLocations.map(loc => { + logger.info("Reading metadata " + loc) + mapper.readValue(loc.getInputStream, classOf[Unit]) + }); + new Metadata(units) + } + + def parse(yaml: String): Unit = { + val meta: Unit = mapper.readValue(yaml, classOf[Unit]) + meta + } + + def dump(meta: Unit) = { + println(" input = " + meta.input_topic) + println("output = " + meta.output_topic) + meta.xr_telemetry.map(m => { + println(" path = " + m.path) + println("keys:") + m.keys.getOrElse(Array[Item]()).map(item => { + println(" " + item.name + " -> " + item.display_name) + }); + println("content:") + m.content.map(item => { + println(" " + item.name + " -> " + item.display_name) + }); + }); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala new file mode 100644 index 0000000..4019262 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.ves.telemetry + +import com.cisco.ztt.meta.Telemetry +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JValue +import org.apache.log4j.Logger +import com.cisco.ztt.meta.Item +import org.json4s.JsonAST.JObject + + +class VesMapper(config: Telemetry, namespace: String) extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = { + + val keys = config.keys.getOrElse(Array[Item]()).map( k => { + val value = map.get(k.name).get.toString() + k.name -> value + }).toMap + ("host" -> host) + + val items = config.content.map( wanted => { + val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name) + val value = map.get(wanted.name).get.toString() + + new TimeseriesDatapoint(name, value, timestamp, keys) + }) + + items.toSet + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala new file mode 100644 index 0000000..5eaf8f3 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.ves.telemetry + +import com.cisco.ztt.Transformer +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.ztt.meta.Unit +import com.cisco.ztt.Payload +import org.apache.log4j.Logger +import org.json4s.jackson.JsonMethods +import org.json4s.JsonAST.JValue +import org.json4s.JsonAST.JObject +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JArray + + +class VesTransformer(unit: Unit) extends Serializable with Transformer { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def inputTopic: String = { unit.input_topic } + + val mappers = unit.ves_telemetry.map(d => { + d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace) + }) //.toMap + + def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = { + val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src } + val parsed = JsonMethods.parse(rawEvent.getRawdata) + + if (! parsed.isInstanceOf[JObject]) { + Holder.logger.warn("Cannot process parsed JSON") + return (false, Set[Payload]()) + } + val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]] + val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]] + val host = header.get("reportingEntityName").get.toString + val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3) + + val generated = mappers.flatMap(r => { + val path = r._1 + val mapper = r._2 + + val datapoints = visit(path, value, mapper, host, timestamp) + datapoints.map(d => { + new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d) + }) + }).toSet + (true, generated) + } + + def visit( + path: Array[String], + map: Map[String, Any], + mapper: VesMapper, + host: String, + timestamp: String): Set[TimeseriesDatapoint] = { + if (path.length > 0) { + val option = map.get(path.head) + option match { + case None => { + Holder.logger.warn("VES mapper failed to dereference JSON " + path.head) + return Set[TimeseriesDatapoint]() + } + + case _ => { + option.get match { + case l: List[_] => { + val list = l.asInstanceOf[List[Map[String, Any]]] + return list.flatMap(sub => { + visit(path.tail, sub, mapper, host, timestamp) + }).toSet + } + case m: Map[_, _] => { + val sub = m.asInstanceOf[Map[String, Any]] + return visit(path.tail, sub, mapper, host, timestamp) + } + + } + } + } + } else { + val datapoints = mapper.transform(map, host, timestamp) + return datapoints + } + + Set[TimeseriesDatapoint]() + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala new file mode 100644 index 0000000..57339f8 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Class used for writing [[DStream]]s to Kafka + * @param dStream [[DStream]] to be written to Kafka + */ +class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T]) + extends KafkaWriter[T] with Serializable { + /** + * Write a [[DStream]] to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + override def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit = + dStream.foreachRDD { rdd => + val rddWriter = new RDDKafkaWriter[T](rdd) + rddWriter.writeToKafka(producerConfig, transformFunc, callback) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala new file mode 100644 index 0000000..2fbf6bc --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import java.util.concurrent.{Callable, ExecutionException, TimeUnit} + +import com.google.common.cache._ +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.kafka.clients.producer.KafkaProducer + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +/** Cache of [[KafkaProducer]]s */ +object KafkaProducerCache { + private type ProducerConf = Seq[(String, Object)] + private type ExProducer = KafkaProducer[_, _] + + private val removalListener = new RemovalListener[ProducerConf, ExProducer]() { + override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit = + notif.getValue.close() + } + + private val cacheExpireTimeout = 10.minutes.toMillis + private val cache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) + .removalListener(removalListener) + .build[ProducerConf, ExProducer]() + + /** + * Retrieve a [[KafkaProducer]] in the cache or create a new one + * @param producerConfig producer configuration for creating [[KafkaProducer]] + * @return a [[KafkaProducer]] already in the cache or a new one + */ + def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] = + try { + cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] { + override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava) + }).asInstanceOf[KafkaProducer[K, V]] + } catch { + case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) + if e.getCause != null => throw e.getCause + } + + /** + * Flush and close the [[KafkaProducer]] in the cache associated with this config + * @param producerConfig producer configuration associated to a [[KafkaProducer]] + */ + def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig)) + + private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1) +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala new file mode 100644 index 0000000..90d2bb1 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} + +import scala.reflect.ClassTag + +/** + * Class used to write DStreams, RDDs and Datasets to Kafka + * + * Example usage: + * {{{ + * import com.github.benfradet.spark.kafka.writer.KafkaWriter._ + * import org.apache.kafka.common.serialization.StringSerializer + * + * val topic = "my-topic" + * val producerConfig = Map( + * "bootstrap.servers" -> "127.0.0.1:9092", + * "key.serializer" -> classOf[StringSerializer].getName, + * "value.serializer" -> classOf[StringSerializer].getName + * ) + * + * val dStream: DStream[String] = ... + * dStream.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](topic, s) + * ) + * + * val rdd: RDD[String] = ... + * rdd.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](localTopic, s) + * ) + * + * val dataset: Dataset[Foo] = ... + * dataset.writeToKafka( + * producerConfig, + * f => new ProducerRecord[String, String](localTopic, f.toString) + * ) + * + * val dataFrame: DataFrame = ... + * dataFrame.writeToKafka( + * producerConfig, + * r => new ProducerRecord[String, String](localTopic, r.get(0).toString) + * ) + * }}} + * It is also possible to provide a callback for each write to Kafka. + * + * This is optional and has a value of None by default. + * + * Example Usage: + * {{{ + * @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer") + * + * val dStream: DStream[String] = ... + * dStream.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](topic, s), + * Some(new Callback with Serializable { + * override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = { + * if (Option(e).isDefined) { + * log.warn("error sending message", e) + * } else { + * log.info(s"write succeeded, offset: ${metadata.offset()") + * } + * } + * }) + * ) + * }}} + */ +abstract class KafkaWriter[T: ClassTag] extends Serializable { + /** + * Write a DStream, RDD, or Dataset to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala new file mode 100644 index 0000000..c6f6133 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * Class used for writing [[RDD]]s to Kafka + * @param rdd [[RDD]] to be written to Kafka + */ +class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T]) + extends KafkaWriter[T] with Serializable { + /** + * Write a [[RDD]] to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + override def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit = + rdd.foreachPartition { partition => + val producer = KafkaProducerCache.getProducer[K, V](producerConfig) + partition + .map(transformFunc) + .foreach(record => producer.send(record, callback.orNull)) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala new file mode 100644 index 0000000..04f5ba7 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** Implicit conversions between + * - [[DStream]] -> [[KafkaWriter]] + * - [[RDD]] -> [[KafkaWriter]] + * - [[Dataset]] -> [[KafkaWriter]] + * - [[DataFrame]] -> [[KafkaWriter]] + */ +package object writer { + /** + * Convert a [[DStream]] to a [[KafkaWriter]] implicitly + * @param dStream [[DStream]] to be converted + * @return [[KafkaWriter]] ready to write messages to Kafka + */ + implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] = + new DStreamKafkaWriter[T](dStream) + + /** + * Convert a [[RDD]] to a [[KafkaWriter]] implicitly + * @param rdd [[RDD]] to be converted + * @return [[KafkaWriter]] ready to write messages to Kafka + */ + implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] = + new RDDKafkaWriter[T](rdd) + +} diff --git a/pnda-ztt-app/src/test/resources/application.properties b/pnda-ztt-app/src/test/resources/application.properties new file mode 100644 index 0000000..c99a8ad --- /dev/null +++ b/pnda-ztt-app/src/test/resources/application.properties @@ -0,0 +1,8 @@ +app.job_name=collectd.tsdb +kafka.brokers=localhost:9092 +kafka.zookeeper=localhost:2181 +app.checkpoint_path=/tmp +app.processing_parallelism=1 +app.batch_size_seconds=5 +kafka.consume_from_beginning=false +opentsdb.ip=localhost:4242 diff --git a/pnda-ztt-app/src/test/resources/log4j.testing.properties b/pnda-ztt-app/src/test/resources/log4j.testing.properties new file mode 100644 index 0000000..c3b266c --- /dev/null +++ b/pnda-ztt-app/src/test/resources/log4j.testing.properties @@ -0,0 +1,14 @@ +log4j.rootCategory=WARN,console +log4j.logger.org=WARN +log4j.logger.com.cisco.pnda=DEBUG,console +log4j.additivity.com.cisco.pnda=false +log4j.logger.com.cisco.ztt=DEBUG,console +log4j.additivity.com.cisco.ztt=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.immediateFlush=true +log4j.appender.console.encoding=UTF-8 + +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=%d %-5p %c - %m%n diff --git a/pnda-ztt-app/src/test/resources/meta/test-one.yaml b/pnda-ztt-app/src/test/resources/meta/test-one.yaml new file mode 100644 index 0000000..9900043 --- /dev/null +++ b/pnda-ztt-app/src/test/resources/meta/test-one.yaml @@ -0,0 +1,14 @@ +input_topic: telemetry +output_topic: timeseries + +xr_telemetry: + - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters + keys: + - name: interface-name + display_name: "Interface Name" + content: + - name: bytes-received + display_name: "Bytes Received" + + - name: bytes-sent + display_name: "Bytes Sent" diff --git a/pnda-ztt-app/src/test/resources/meta/test-three.yaml b/pnda-ztt-app/src/test/resources/meta/test-three.yaml new file mode 100644 index 0000000..9900043 --- /dev/null +++ b/pnda-ztt-app/src/test/resources/meta/test-three.yaml @@ -0,0 +1,14 @@ +input_topic: telemetry +output_topic: timeseries + +xr_telemetry: + - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters + keys: + - name: interface-name + display_name: "Interface Name" + content: + - name: bytes-received + display_name: "Bytes Received" + + - name: bytes-sent + display_name: "Bytes Sent" diff --git a/pnda-ztt-app/src/test/resources/meta/test-two.yaml b/pnda-ztt-app/src/test/resources/meta/test-two.yaml new file mode 100644 index 0000000..9900043 --- /dev/null +++ b/pnda-ztt-app/src/test/resources/meta/test-two.yaml @@ -0,0 +1,14 @@ +input_topic: telemetry +output_topic: timeseries + +xr_telemetry: + - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters + keys: + - name: interface-name + display_name: "Interface Name" + content: + - name: bytes-received + display_name: "Bytes Received" + + - name: bytes-sent + display_name: "Bytes Sent" diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala new file mode 100644 index 0000000..c08ae2f --- /dev/null +++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/JsonParserTest.scala @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.telemetry.xr + +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.json4s.JsonAST.JArray +import com.cisco.ztt.cisco.xr.telemetry.JsonParser + +class JsonParserTest extends FlatSpec with Matchers { + val cpu_json = """ +{ + "Source": "172.16.1.157:27059", + "Telemetry": { + "node_id_str": "IOS-XRv9k-edge-1", + "subscription_id_str": "cpu", + "encoding_path": "Cisco-IOS-XR-wdsysmon-fd-oper:system-monitoring/cpu-utilization", + "collection_id": 265673, + "collection_start_time": 1505905434090, + "msg_timestamp": 1505905434090, + "collection_end_time": 1505905434103 + }, + "Rows": [ + { + "Timestamp": 1505905434099, + "Keys": { + "node-name": "0/RP0/CPU0" + }, + "Content": { + "process-cpu_PIPELINE_EDIT": [ + { + "process-cpu-fifteen-minute": 0, + "process-cpu-five-minute": 0, + "process-cpu-one-minute": 0, + "process-id": 1, + "process-name": "init" + }, + { + "process-cpu-fifteen-minute": 0, + "process-cpu-five-minute": 0, + "process-cpu-one-minute": 0, + "process-id": 1544, + "process-name": "bash" + }, + { + "process-cpu-fifteen-minute": 0, + "process-cpu-five-minute": 0, + "process-cpu-one-minute": 0, + "process-id": 26436, + "process-name": "sleep" + } + ], + "total-cpu-fifteen-minute": 6, + "total-cpu-five-minute": 6, + "total-cpu-one-minute": 6 + } + } + ] +} + """ + + "JsonParser" should "successfully parse cpu telemetry JSON" in { + val event = JsonParser.parse(cpu_json) + + event.Telemetry.subscription_id_str should be ("cpu") + event.Rows(0).Keys.size should be (1) + + val subrows = event.Rows(0).Content("process-cpu_PIPELINE_EDIT") + val extracted = JsonParser.array(subrows) + + extracted.size should be (3) + extracted(0).size should be (5) + } + + val null_keys = """ +{ + "Source": "172.16.1.157:49227", + "Telemetry": { + "node_id_str": "IOS-XRv9k-edge-1", + "subscription_id_str": "Logging", + "encoding_path": "Cisco-IOS-XR-infra-syslog-oper:syslog/logging-statistics", + "collection_id": 925712, + "collection_start_time": 1507552918199, + "msg_timestamp": 1507552918199, + "collection_end_time": 1507552918203 + }, + "Rows": [ + { + "Timestamp": 1507552918201, + "Keys": null, + "Content": { + "buffer-logging-stats": { + "buffer-size": 2097152, + "is-log-enabled": "true", + "message-count": 221, + "severity": "message-severity-debug" + } + } + } + ] +} + """ + + it should "successfully parse JSON with null keys" in { + val event = JsonParser.parse(null_keys) + + event.Rows(0).Keys.size should be (0) + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala new file mode 100644 index 0000000..b133c9e --- /dev/null +++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/cisco/telemetry/xr/TelemetryMapperTest.scala @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.telemetry.xr + +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import com.cisco.ztt.cisco.xr.telemetry.TimeseriesMapper +import com.cisco.ztt.meta.YamlReader +import com.cisco.ztt.cisco.xr.telemetry.JsonParser + +class TelemetryMapperTest extends FlatSpec with Matchers { + + val ipv6_yaml = """ +input_topic: telemetry.avro +output_topic: timeseries +timeseries_namespace: ipv6 + +xr_telemetry: + - path: Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic + keys: + - name: node-name + display_name: "Node Name" + content: + - name: ipv6.total-packets + display_name: "Total IPV6 Packets" + ts_name: total-ipv6-packets + + - name: icmp.total-messages + display_name: "Total ICMP Messages" +""" + + val ipv6_json = """ +{ + "Source": "172.16.1.157:56197", + "Telemetry": { + "node_id_str": "IOS-XRv9k-edge-1", + "subscription_id_str": "ipv6-io", + "encoding_path": "Cisco-IOS-XR-ipv6-io-oper:ipv6-io/nodes/node/statistics/traffic", + "collection_id": 282962, + "collection_start_time": 1506008887021, + "msg_timestamp": 1506008887021, + "collection_end_time": 1506008887039 + }, + "Rows": [ + { + "Timestamp": 1506008887031, + "Keys": { + "node-name": "0/RP0/CPU0" + }, + "Content": { + "icmp": { + "output-messages": 0, + "total-messages": 4, + "unknown-error-type-messages": 0 + }, + "ipv6": { + "bad-header-packets": 0, + "total-packets": 12, + "unknown-protocol-packets": 0 + }, + "ipv6-node-discovery": { + "received-redirect-messages": 0, + "sent-redirect-messages": 0 + } + } + } + ] +} +""" + + "TelemetryMapper" should "Map to wanted timeseries values" in { + val meta = YamlReader.parse(ipv6_yaml) + val mapper = new TimeseriesMapper(meta.xr_telemetry(0), "demo") + + val event = JsonParser.parse(ipv6_json) + val timeseries = mapper.transform(event).toList + + timeseries.length should be (2) + timeseries(0).metric should be ("demo.icmp.total-messages") + timeseries(1).metric should be ("demo.total-ipv6-packets") + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala new file mode 100644 index 0000000..2ee1749 --- /dev/null +++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/DatapointTest.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +import org.scalatest.Matchers +import org.scalatest.FlatSpec +import com.cisco.ztt.TimeseriesDatapoint +import java.io.StringWriter +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +class DatapointTest extends FlatSpec with Matchers { + + "TimeseriesDatapoint" should "serialize to json" in { + val data = Array( + new TimeseriesDatapoint("packets-in", "5", "1000000", + Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1")), + new TimeseriesDatapoint("packets-out", "5", "1000000", + Map("host" -> "host1", "inteface" -> "GigabitEthernet0/0/0/1")) + ) + + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + val out = new StringWriter + mapper.writeValue(out, data) + val json = out.toString() + } + +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala new file mode 100644 index 0000000..eabe54e --- /dev/null +++ b/pnda-ztt-app/src/test/scala/com/cisco/ztt/meta/YamlReaderTest.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +import org.scalatest.FlatSpec +import org.scalatest.Matchers + +class YamlReaderTest extends FlatSpec with Matchers { + + val interface_yaml = """ +input_topic: telemetry +output_topic: timeseries + +xr_telemetry: + - path: Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters + keys: + - name: interface-name + display_name: "Interface Name" + + content: + - name: bytes-received + display_name: "Bytes Received" + + - name: bytes-sent + display_name: "Bytes Sent" + + - name: packets-received + display_name: "Packets Received" + + - name: packets-sent + display_name: "Packets Sent" +""" + + "YamlReader" should "successfully parse YAML text" in { + val meta = YamlReader.parse(interface_yaml) + + meta.input_topic should be ("telemetry") + meta.output_topic should be ("timeseries") + meta.xr_telemetry.length should be (1) + meta.xr_telemetry(0).keys.get.length should be(1) + meta.xr_telemetry(0).content.length should be(4) + } + + it should "read multiple files from classpath" in { + val meta = YamlReader.load("classpath*:meta/test-*.yaml") + meta.units.length should be (3) + } +} diff --git a/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala new file mode 100644 index 0000000..58f87fe --- /dev/null +++ b/pnda-ztt-app/src/test/scala/ves/telemetry/VesTransformerTest.scala @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ves.telemetry + +import org.scalatest.BeforeAndAfter +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.ztt.meta.YamlReader +import com.cisco.ztt.ves.telemetry.VesTransformer +import org.apache.log4j.BasicConfigurator + +class VesTransformerTest extends FlatSpec with Matchers with BeforeAndAfter { + + before { + BasicConfigurator.configure(); + } + + val yaml = """ +format: ves +input_topic: ves.avro +processor: timeseries +output_topic: timeseries +timeseries_namespace: nic + +ves_telemetry: + - path: measurementsForVfScalingFields/vNicUsageArray + keys: + - name: vNicIdentifier + content: + - name: receivedTotalPacketsDelta +""" + val payload = """{ + "event": { + "commonEventHeader": { + "startEpochMicrosec": 1537258053361276, + "sourceId": "bab0c4de-cfb8-4a0d-b7e4-a3d4020bb667", + "eventId": "TrafficStats_1.2.3.4", + "nfcNamingCode": "vVNF", + "reportingEntityId": "No UUID available", + "internalHeaderFields": { + "collectorTimeStamp": "Tue, 09 18 2018 08:01:41 GMT" + }, + "eventType": "HTTP request rate", + "priority": "Normal", + "version": 3, + "reportingEntityName": "fwll", + "sequence": 6108, + "domain": "measurementsForVfScaling", + "lastEpochMicrosec": 1537258063557408, + "eventName": "vFirewallBroadcastPackets", + "sourceName": "vFW_SINK_VNF2", + "nfNamingCode": "vVNF" + }, + "measurementsForVfScalingFields": { + "cpuUsageArray": [ + { + "percentUsage": 0, + "cpuIdentifier": "cpu1", + "cpuIdle": 100, + "cpuUsageSystem": 0, + "cpuUsageUser": 0 + } + ], + "measurementInterval": 10, + "requestRate": 9959, + "vNicUsageArray": [ + { + "transmittedOctetsDelta": 0, + "receivedTotalPacketsDelta": 0, + "vNicIdentifier": "eth0", + "valuesAreSuspect": "true", + "transmittedTotalPacketsDelta": 0, + "receivedOctetsDelta": 0 + } + ], + "measurementsForVfScalingVersion": 2.1 + } + } +} +""" + "VesTransformer" should "successfully transform a VES event" in { + val unit = YamlReader.parse(yaml) + val ves = new VesTransformer(unit) + + val event = new DataPlatformEvent("src", System.currentTimeMillis(), "127.0.0.1", payload) + val (status, result) = ves.transform(event) + + status should be(true) + result.toList.length should be (1) + result.toList(0).datapoint.metric should be ("nic.receivedTotalPacketsDelta") + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore new file mode 100644 index 0000000..d392f0e --- /dev/null +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/.gitignore @@ -0,0 +1 @@ +*.jar diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties new file mode 100644 index 0000000..8c1b965 --- /dev/null +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/application.properties @@ -0,0 +1,8 @@ +kafka.brokers=${environment_kafka_brokers} +kafka.zookeeper=${environment_kafka_zookeeper} +app.checkpoint_path=${component_checkpoint_path} +app.job_name=${component_job_name} +app.processing_parallelism=${component_processing_parallelism} +app.batch_size_seconds=${component_batch_size_seconds} +kafka.consume_from_beginning=${component_consume_from_beginning} +opentsdb.ip=${environment_opentsdb}
\ No newline at end of file diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties new file mode 100644 index 0000000..6a3cab1 --- /dev/null +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/log4j.properties @@ -0,0 +1,13 @@ +log4j.rootLogger=ERROR,rolling +log4j.logger.com.cisco.pnda=${component.log_level},rolling +log4j.additivity.com.cisco.pnda=false +log4j.logger.com.cisco.ztt=DEBUG,rolling +log4j.additivity.com.cisco.ztt=false + +log4j.appender.rolling=org.apache.log4j.RollingFileAppender +log4j.appender.rolling.layout=org.apache.log4j.PatternLayout +log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n +log4j.appender.rolling.maxFileSize=50MB +log4j.appender.rolling.maxBackupIndex=1 +log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/spark.log +log4j.appender.rolling.encoding=UTF-8 diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json new file mode 100644 index 0000000..1029987 --- /dev/null +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/opentsdb.json @@ -0,0 +1,8 @@ +[ + { + "name": "nic.receivedTotalPacketsDelta" + }, + { + "name": "cpu.percentUsage" + } +]
\ No newline at end of file diff --git a/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json new file mode 100644 index 0000000..2f8ab6a --- /dev/null +++ b/pnda-ztt-app/src/universal/sparkStreaming/PndaZTTApp/properties.json @@ -0,0 +1,10 @@ +{ + "main_jar": "PndaZTTApp.jar", + "main_class": "com.cisco.ztt.App", + "log_level": "INFO", + "batch_size_seconds" : "2", + "processing_parallelism" : "1", + "checkpoint_path": "", + "input_topic": "ves.avro", + "consume_from_beginning": "false" +} |