summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves
diff options
context:
space:
mode:
authorchenxdu <chenxdu@cisco.com>2018-09-19 16:42:15 +0200
committerDonald Hunter <donaldh@cisco.com>2018-10-01 10:23:50 +0100
commitfd1060d4c176272f312fb23495ff8cdbebc121ae (patch)
tree2f24090ec71e47e69bd392918198745d0c8406e8 /pnda-ztt-app/src/main/scala/com/cisco/ztt/ves
parenta789d153737a991c14c7be03ae9044563573e4d2 (diff)
PNDA Telemetry app for virtual firwall use case
The telemetry app ingests virtual firewall VES events into HDFS and the timeseries datastore to support futher analytics. Change-Id: I3a0920d4b416c1c165311ab9ff0fc31d8c96499f Signed-off-by: chenxdu <chenxdu@cisco.com> Issue-ID: DCAEGEN2-632 Signed-off-by: Donald Hunter <donaldh@cisco.com>
Diffstat (limited to 'pnda-ztt-app/src/main/scala/com/cisco/ztt/ves')
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala48
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala104
2 files changed, 152 insertions, 0 deletions
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
new file mode 100644
index 0000000..4019262
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.meta.Telemetry
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JValue
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Item
+import org.json4s.JsonAST.JObject
+
+
+class VesMapper(config: Telemetry, namespace: String) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = {
+
+ val keys = config.keys.getOrElse(Array[Item]()).map( k => {
+ val value = map.get(k.name).get.toString()
+ k.name -> value
+ }).toMap + ("host" -> host)
+
+ val items = config.content.map( wanted => {
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name)
+ val value = map.get(wanted.name).get.toString()
+
+ new TimeseriesDatapoint(name, value, timestamp, keys)
+ })
+
+ items.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
new file mode 100644
index 0000000..5eaf8f3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.Transformer
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+import org.apache.log4j.Logger
+import org.json4s.jackson.JsonMethods
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.JObject
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JArray
+
+
+class VesTransformer(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val mappers = unit.ves_telemetry.map(d => {
+ d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace)
+ }) //.toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val parsed = JsonMethods.parse(rawEvent.getRawdata)
+
+ if (! parsed.isInstanceOf[JObject]) {
+ Holder.logger.warn("Cannot process parsed JSON")
+ return (false, Set[Payload]())
+ }
+ val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]]
+ val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]]
+ val host = header.get("reportingEntityName").get.toString
+ val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3)
+
+ val generated = mappers.flatMap(r => {
+ val path = r._1
+ val mapper = r._2
+
+ val datapoints = visit(path, value, mapper, host, timestamp)
+ datapoints.map(d => {
+ new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ }).toSet
+ (true, generated)
+ }
+
+ def visit(
+ path: Array[String],
+ map: Map[String, Any],
+ mapper: VesMapper,
+ host: String,
+ timestamp: String): Set[TimeseriesDatapoint] = {
+ if (path.length > 0) {
+ val option = map.get(path.head)
+ option match {
+ case None => {
+ Holder.logger.warn("VES mapper failed to dereference JSON " + path.head)
+ return Set[TimeseriesDatapoint]()
+ }
+
+ case _ => {
+ option.get match {
+ case l: List[_] => {
+ val list = l.asInstanceOf[List[Map[String, Any]]]
+ return list.flatMap(sub => {
+ visit(path.tail, sub, mapper, host, timestamp)
+ }).toSet
+ }
+ case m: Map[_, _] => {
+ val sub = m.asInstanceOf[Map[String, Any]]
+ return visit(path.tail, sub, mapper, host, timestamp)
+ }
+
+ }
+ }
+ }
+ } else {
+ val datapoints = mapper.transform(map, host, timestamp)
+ return datapoints
+ }
+
+ Set[TimeseriesDatapoint]()
+ }
+}