summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app/src/main/scala/com
diff options
context:
space:
mode:
Diffstat (limited to 'pnda-ztt-app/src/main/scala/com')
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala93
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java106
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java54
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java63
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala81
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala110
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala25
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala35
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala24
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala99
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala62
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala22
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala32
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala70
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala76
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala26
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala74
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala48
-rw-r--r--pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala104
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala50
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala71
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala103
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala52
-rw-r--r--pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala52
29 files changed, 1827 insertions, 0 deletions
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
new file mode 100644
index 0000000..b143b62
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: StatReporter
+ * Purpose: Report batch processing metrics to the PNDA metric logger
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.pnda
+
+import scala.util.control.NonFatal
+import java.io.StringWriter
+import java.io.PrintWriter
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
+import org.apache.log4j.Logger
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+
+class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
+ def doSend(metricName: String, metricValue: String) = {
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(metricsUrl)
+ post.setHeader("Content-type", "application/json")
+ val ts = java.lang.System.currentTimeMillis()
+ val body = f"""{
+ | "data": [{
+ | "source": "application.$appName",
+ | "metric": "application.kpi.$appName.$metricName",
+ | "value": "$metricValue",
+ | "timestamp": $ts%d
+ | }],
+ | "timestamp": $ts%d
+ |}""".stripMargin
+
+ logger.debug(body)
+ post.setEntity(new StringEntity(body))
+ val response = httpClient.execute(post)
+ if (response.getStatusLine.getStatusCode() != 200) {
+ logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
+ }
+
+ } catch {
+ case NonFatal(t) => {
+ logger.error("POST failed: " + metricsUrl)
+ val sw = new StringWriter
+ t.printStackTrace(new PrintWriter(sw))
+ logger.error(sw.toString)
+ }
+ }
+ }
+ doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
+ doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
+ doSend("num-records", batch.batchInfo.numRecords.toString())
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
new file mode 100644
index 0000000..1a08bd4
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEvent
+ * Purpose: Data model class for an avro event on Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class DataPlatformEvent implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+ protected static ObjectMapper _mapper = new ObjectMapper();
+
+ private String _src;
+ private Long _timestamp;
+ private String _hostIp;
+ private String _rawdata;
+
+ public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata)
+ {
+ _src = src;
+ _timestamp = timestamp;
+ _hostIp = host_ip;
+ _rawdata = rawdata;
+ }
+
+ public String getSrc()
+ {
+ return _src;
+ }
+
+ public Long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public String getHostIp()
+ {
+ return _hostIp;
+ }
+
+ public String getRawdata()
+ {
+ return _rawdata;
+ }
+
+ @Override
+ public String toString()
+ {
+ try
+ {
+ return _mapper.writeValueAsString(this);
+ }
+ catch (Exception ex)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ boolean result = false;
+ if (other instanceof DataPlatformEvent)
+ {
+ DataPlatformEvent that = (DataPlatformEvent) other;
+ result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc())))
+ && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp())))
+ && (this.getHostIp() == that.getHostIp() || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp())))
+ && (this.getRawdata() == that.getRawdata() || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata())));
+ }
+ return result;
+
+ }
+
+ public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException
+ {
+ return _mapper.readTree(_rawdata);
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
new file mode 100644
index 0000000..ef2bc0d
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: DataPlatformEventDecoder
+ * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.log4j.Logger;
+
+
+public class DataPlatformEventCodec
+{
+ private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName());
+
+ private Schema.Parser _parser = new Schema.Parser();
+ private DatumReader<GenericRecord> _reader;
+ private DatumWriter<GenericRecord> _writer;
+ private Schema _schema;
+ private String _schemaDef;
+
+ public DataPlatformEventCodec(String schemaDef) throws IOException
+ {
+ _schemaDef = schemaDef;
+ _schema = _parser.parse(schemaDef);
+ _reader = new GenericDatumReader<GenericRecord>(_schema);
+ _writer = new GenericDatumWriter<GenericRecord>(_schema);
+ }
+
+ public DataPlatformEvent decode(byte[] data) throws IOException
+ {
+ try
+ {
+ Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+ GenericRecord r = _reader.read(null, decoder);
+ return new DataPlatformEvent((String) r.get("src").toString(),
+ (Long) r.get("timestamp"),
+ (String) r.get("host_ip").toString(),
+ new String(((ByteBuffer)r.get("rawdata")).array()));
+ }
+ catch(Exception ex)
+ {
+ LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex);
+ throw new IOException(ex);
+ }
+ }
+
+ final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
+ public static String hexStr(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for ( int j = 0; j < bytes.length; j++ ) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = hexArray[v >>> 4];
+ hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ public byte[] encode(DataPlatformEvent e) throws IOException
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ GenericRecord datum = new GenericData.Record(_schema);
+ datum.put("src", e.getSrc());
+ datum.put("timestamp", e.getTimestamp());
+ datum.put("host_ip", e.getHostIp());
+ datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8")));
+ _writer.write(datum, encoder);
+ encoder.flush();
+ out.close();
+ return out.toByteArray();
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
new file mode 100644
index 0000000..24a9d35
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: StaticHelpers
+ * Purpose: Helper functions
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.pnda.model;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class StaticHelpers {
+ public static String loadResourceFile(String path)
+ {
+ InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path);
+ try
+ {
+ char[] buf = new char[2048];
+ Reader r = new InputStreamReader(is, "UTF-8");
+ StringBuilder s = new StringBuilder();
+ while (true)
+ {
+ int n = r.read(buf);
+ if (n < 0)
+ break;
+ s.append(buf, 0, n);
+ }
+ return s.toString();
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
new file mode 100644
index 0000000..6bc2083
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import com.cisco.pnda.StatReporter
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.YamlReader
+import org.apache.log4j.BasicConfigurator
+
+object App {
+
+ private[this] val logger = Logger.getLogger(getClass().getName())
+
+ def main(args: Array[String]) {
+
+ BasicConfigurator.configure();
+
+ val props = AppConfig.loadProperties();
+ val loggerUrl = props.getProperty("environment.metric_logger_url")
+ val appName = props.getProperty("component.application")
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val metadata = YamlReader.load()
+ if (metadata.units.length == 0) {
+ logger.error("Trying to run app without metadata")
+ System.exit(1)
+ }
+ val pipeline = new ZttPipeline(metadata)
+
+ // Create the streaming context, or load a saved one from disk
+ val ssc = if (checkpointDirectory.length() > 0)
+ StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create();
+
+ sys.ShutdownHookThread {
+ logger.info("Gracefully stopping Spark Streaming Application")
+ ssc.stop(true, true)
+ logger.info("Application stopped")
+ }
+
+ if (loggerUrl != null) {
+ logger.info("Reporting stats to url: " + loggerUrl)
+ ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
+ }
+ logger.info("Starting spark streaming execution")
+ ssc.start()
+ ssc.awaitTermination()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
new file mode 100644
index 0000000..a711cab
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Name: AppConfig
+ * Purpose: Load application properties file.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+package com.cisco.ztt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class AppConfig
+{
+ private static final Logger LOGGER = Logger.getLogger(AppConfig.class);
+
+ private static Properties _properties;
+ private static Object _lock = new Object();
+
+ public static Properties loadProperties()
+ {
+ synchronized (_lock)
+ {
+ if (_properties == null)
+ {
+ _properties = new Properties();
+ try
+ {
+ InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties");
+ _properties.load(is);
+ is.close();
+ LOGGER.info("Properties loaded");
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Failed to load properties", e);
+ System.exit(1);
+ }
+ }
+ return _properties;
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
new file mode 100644
index 0000000..c0bc61e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaInput
+ * Purpose: Generate a dstream from Kafka
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.StringDecoder
+import org.apache.log4j.Logger
+
+class KafkaInput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def readFromKafka(ssc: StreamingContext, topic: String) = {
+ val props = AppConfig.loadProperties();
+ val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
+ if (props.getProperty("kafka.consume_from_beginning").toBoolean) {
+ kafkaParams.put("auto.offset.reset", "smallest");
+ }
+
+ Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list"))
+ Holder.logger.info("Registering with kafka using topic " + topic)
+
+ val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
+ ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
+
+ // Decode avro container format
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val rawMessages = messages.map(x => {
+ val eventDecoder = new DataPlatformEventCodec(avroSchemaString);
+ val payload = x._2;
+ val dataPlatformEvent = eventDecoder.decode(payload);
+ dataPlatformEvent;
+ });
+ rawMessages;
+ };
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
new file mode 100644
index 0000000..1041d00
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+
+
+
+import java.io.StringWriter
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.pnda.model.DataPlatformEventCodec
+import com.cisco.pnda.model.StaticHelpers
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter
+
+
+class KafkaOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def writeToKafka(output: DStream[Payload]) = {
+
+ val props = AppConfig.loadProperties();
+ val producerConfig = Map(
+ "bootstrap.servers" -> props.getProperty("kafka.brokers"),
+ "key.serializer" -> classOf[StringSerializer].getName,
+ "value.serializer" -> classOf[ByteArraySerializer].getName
+)
+ output.writeToKafka(
+ producerConfig,
+ s => {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, s.datapoint)
+ val json = out.toString()
+
+ val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json)
+ val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
+ val codec = new DataPlatformEventCodec(avroSchemaString);
+
+ new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event))
+ }
+ )
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
new file mode 100644
index 0000000..9077986
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: OpenTSDBOutput
+ * Purpose: Write a dstream to OpenTSDB
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import java.io.StringWriter
+
+import scala.Iterator
+
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.log4j.Logger
+import org.apache.spark.streaming.dstream.DStream
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import scala.collection.mutable.ArrayBuffer
+
+class OpenTSDBOutput extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def putOpentsdb[T](opentsdbIP: String,
+ input: DStream[Payload]): DStream[Payload] = {
+ input.mapPartitions(partition => {
+ var size = 0
+ val output = partition.grouped(20).flatMap(group => {
+ val data = ArrayBuffer[TimeseriesDatapoint]()
+ val passthru = group.map(item => {
+ data += item.datapoint
+ item
+ })
+
+ size += data.length
+
+ if (data.length > 0) {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+
+ val out = new StringWriter
+ mapper.writeValue(out, data)
+ val json = out.toString()
+
+ Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB")
+ Holder.logger.debug(json)
+
+ if (opentsdbIP != null && opentsdbIP.length() > 0) {
+ val openTSDBUrl = "http://" + opentsdbIP + "/api/put"
+ try {
+ val httpClient = new DefaultHttpClient()
+ val post = new HttpPost(openTSDBUrl)
+ post.setHeader("Content-type", "application/json")
+ post.setEntity(new StringEntity(json))
+ val response = httpClient.execute(post)
+ // Holder.logger.debug(EntityUtils.toString(response.getEntity()))
+ } catch {
+ case t: Throwable => {
+ Holder.logger.warn(t)
+ }
+ }
+ }
+ } else {
+ Holder.logger.debug("No datapoints to post to OpenTSDB")
+ }
+ passthru
+ })
+ output
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
new file mode 100644
index 0000000..2961bd0
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class Payload(
+ val publishSrc: String,
+ val publishTopic: String,
+ val hostIp: String,
+ val timestamp: Long,
+ val datapoint: TimeseriesDatapoint) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
new file mode 100644
index 0000000..7e447bf
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+class TimeseriesDatapoint(
+ val metric: String,
+ val value: String,
+ val timestamp: String,
+ val tags: Map[String, String]) {
+
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
new file mode 100644
index 0000000..aae1e8c
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+
+import com.cisco.ztt.meta.Metadata
+import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher
+import com.cisco.ztt.ves.telemetry.VesTransformer
+
+class TransformManager(metadata: Metadata) {
+
+ val transformers: Array[Transformer] = metadata.units.map( unit => {
+
+ unit.format match {
+ case "ves" => new VesTransformer(unit)
+ case "cisco.xr.telemetry" => new TelemetryDispatcher(unit)
+ case _ => new TelemetryDispatcher(unit)
+ }
+ })
+
+ val byTopic = transformers.groupBy( t => { t.inputTopic })
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
new file mode 100644
index 0000000..861eb5e
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt
+
+import com.cisco.pnda.model.DataPlatformEvent
+import org.apache.spark.streaming.dstream.DStream
+
+trait Transformer {
+ def inputTopic: String
+ def transform(event: DataPlatformEvent): (Boolean, Set[Payload])
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
new file mode 100644
index 0000000..c3b3532
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Name: KafkaPipeline
+ * Purpose: Set up the spark streaming processing graph.
+ * Author: PNDA team
+ *
+ * Created: 07/04/2016
+ */
+
+/*
+Copyright (c) 2016 Cisco and/or its affiliates.
+
+This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
+You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
+and/or its affiliated entities, under various laws including copyright, international treaties, patent,
+and/or contract. Any use of the material herein must be in accordance with the terms of the License.
+All rights not expressly granted by the License are reserved.
+
+Unless required by applicable law or agreed to separately in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+express or implied.
+*/
+
+package com.cisco.ztt
+
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import com.cisco.ztt.meta.Metadata
+import com.cisco.pnda.model.DataPlatformEvent
+
+class ZttPipeline(metadata: Metadata) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def create() = {
+ val props = AppConfig.loadProperties();
+ val checkpointDirectory = props.getProperty("app.checkpoint_path");
+ val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
+
+ val sparkConf = new SparkConf();
+ Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory)
+ val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds));
+
+ if (checkpointDirectory.length() > 0) {
+ ssc.checkpoint(checkpointDirectory)
+ }
+
+ val transformManager = new TransformManager(metadata)
+ val streams = transformManager.byTopic.map( x => {
+ val topic = x._1
+ val transformers = x._2
+
+ val inputStream = new KafkaInput().readFromKafka(ssc, topic)
+
+ val timeseriesStream = inputStream.flatMap(dataPlatformEvent => {
+ var handled = false;
+ val datapoints = transformers.flatMap(transformer => {
+ val (ran, data) = transformer.transform(dataPlatformEvent)
+ handled |= ran;
+ data;
+ })
+ if (!handled) {
+ Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata)
+ }
+ datapoints
+ })
+
+ val outputStream =
+ new OpenTSDBOutput().putOpentsdb(
+ props.getProperty("opentsdb.ip"),
+ timeseriesStream);
+
+ new KafkaOutput().writeToKafka(outputStream)
+ });
+
+ ssc;
+ }: StreamingContext
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
new file mode 100644
index 0000000..00470ff
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Telemetry
+
+class InventoryMapper(config: Telemetry) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("InventoryMapper is sinking " + config.path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
new file mode 100644
index 0000000..97f4da3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.JsonAST.JInt
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods
+import org.json4s.jvalue2extractable
+import org.json4s.string2JsonInput
+import scala.reflect.ManifestFactory.arrayType
+import scala.reflect.ManifestFactory.classType
+import org.json4s.JsonAST.JObject
+
+
+case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue])
+case class THeader(node_id_str: String, subscription_id_str: String,
+ encoding_path: String, collection_id: Int, collection_start_time: Int,
+ msg_timestamp: Int, collection_end_time: Int)
+case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow])
+
+
+object JsonParser {
+
+ def parse(json: String): TEvent = {
+ implicit val formats = DefaultFormats
+
+ val parsed = JsonMethods.parse(json)
+ val event = parsed.extract[TEvent]
+ event
+ }
+
+ def array(value: JValue): Array[Map[String,JValue]] = {
+ implicit val formats = DefaultFormats
+ val array = value.asInstanceOf[JArray]
+ array.extract[Array[Map[String, JValue]]]
+ }
+
+ def map(value: JValue): Map[String,JValue] = {
+ implicit val formats = DefaultFormats
+ val map = value.asInstanceOf[JObject]
+ map.extract[Map[String, JValue]]
+ }
+
+ def int(value: JValue): String = {
+ value.asInstanceOf[JInt].num.toString
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
new file mode 100644
index 0000000..2983e88
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+
+trait Mapper {
+ def transform(event: TEvent): Set[TimeseriesDatapoint]
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
new file mode 100644
index 0000000..9f9c775
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import com.cisco.ztt.TimeseriesDatapoint
+import org.apache.log4j.Logger
+
+
+class NullMapper(path: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+ Holder.logger.debug("NullMapper is sinking " + path)
+ Set[TimeseriesDatapoint]()
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
new file mode 100644
index 0000000..b7deadc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.TimeseriesDatapoint
+import com.cisco.ztt.Transformer
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+
+
+class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val transformers = unit.xr_telemetry.map( d => {
+ Holder.logger.warn("Choosing mapper for " + unit.processor)
+ unit.processor match {
+ case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace)
+ case "inventory" => d.path -> new InventoryMapper(d)
+ case _ => d.path -> new NullMapper(d.path)
+ }
+ }).toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ Holder.logger.trace(rawEvent.getRawdata())
+
+ try {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val event = JsonParser.parse(rawEvent.getRawdata())
+ val path = event.Telemetry.encoding_path
+ if (transformers.contains(path)) {
+ Holder.logger.debug("Transforming for " + path)
+ val datapoints = transformers(path).transform(event)
+ val payloads = datapoints.map(d => {
+ new Payload(source, unit.output_topic,
+ rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ (true, payloads)
+ } else {
+ Holder.logger.trace("No transformer in unit for " + path)
+ (false, Set[Payload]())
+ }
+ } catch {
+ case t: Throwable => {
+ Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage)
+ (false, Set[Payload]())
+ }
+ }
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
new file mode 100644
index 0000000..6c9ad80
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.cisco.xr.telemetry
+
+import org.apache.log4j.Logger
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JValue
+import com.cisco.ztt.meta.Item
+import com.cisco.ztt.meta.Telemetry
+
+class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => {
+ item.name -> item
+ }).toMap
+
+ val wantedValues = config.content.map( item => {
+ item.name -> item
+ }).toMap
+
+ def transform(event: TEvent): Set[TimeseriesDatapoint] = {
+
+ val timeseries = event.Rows.flatMap( row => {
+ val keys = row.Keys
+ .getOrElse(Map[String, String]())
+ .filter(k => wantedKeys.contains(k._1))
+ .map( k => {
+ val wanted = wantedKeys(k._1)
+ val name = if (wanted.ts_name != null) wanted.ts_name else k._1
+ name -> k._2
+ }).toMap + ("host" -> event.Telemetry.node_id_str)
+
+ // Flatten nested maps into container.key -> value
+ val expanded = row.Content.flatMap( v => {
+ if (v._2.isInstanceOf[JObject]) {
+ JsonParser.map(v._2).map( kv => {
+ (v._1 + "." + kv._1) -> kv._2
+ })
+ } else {
+ Map[String, JValue](v)
+ }
+ })
+
+ val items = expanded.filter(v => wantedValues.contains(v._1))
+ .map( data => {
+ val wanted = wantedValues(data._1)
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1)
+ val value = JsonParser.int(data._2)
+ val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys)
+ datapoint
+ })
+ items.toSet
+ })
+
+ timeseries.toSet
+ }
+
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
new file mode 100644
index 0000000..84f5bbb
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+class Metadata(val units: Array[Unit]) extends Serializable {
+ def topics() : Set[String] = {
+ val topics = units.map { case (unit : Unit) => {
+ unit.input_topic
+ }}
+
+ topics.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
new file mode 100644
index 0000000..4b72cb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.meta
+
+import org.apache.log4j.Logger
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+case class Unit(
+ format: String,
+ input_topic: String,
+ processor: String,
+ output_topic: String,
+ publish_src: String,
+ timeseries_namespace: String,
+ xr_telemetry: Array[Telemetry],
+ ves_telemetry: Array[Telemetry])
+case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item])
+case class Item(name: String, display_name: String, ts_name: String)
+
+object YamlReader {
+
+ private[this] val logger = Logger.getLogger(getClass().getName());
+
+ val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory())
+ mapper.registerModule(DefaultScalaModule)
+
+ def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = {
+ val patternResolver = new PathMatchingResourcePatternResolver()
+ val mappingLocations = patternResolver.getResources(pattern)
+ val units = mappingLocations.map(loc => {
+ logger.info("Reading metadata " + loc)
+ mapper.readValue(loc.getInputStream, classOf[Unit])
+ });
+ new Metadata(units)
+ }
+
+ def parse(yaml: String): Unit = {
+ val meta: Unit = mapper.readValue(yaml, classOf[Unit])
+ meta
+ }
+
+ def dump(meta: Unit) = {
+ println(" input = " + meta.input_topic)
+ println("output = " + meta.output_topic)
+ meta.xr_telemetry.map(m => {
+ println(" path = " + m.path)
+ println("keys:")
+ m.keys.getOrElse(Array[Item]()).map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ println("content:")
+ m.content.map(item => {
+ println(" " + item.name + " -> " + item.display_name)
+ });
+ });
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
new file mode 100644
index 0000000..4019262
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.meta.Telemetry
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JValue
+import org.apache.log4j.Logger
+import com.cisco.ztt.meta.Item
+import org.json4s.JsonAST.JObject
+
+
+class VesMapper(config: Telemetry, namespace: String) extends Serializable {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = {
+
+ val keys = config.keys.getOrElse(Array[Item]()).map( k => {
+ val value = map.get(k.name).get.toString()
+ k.name -> value
+ }).toMap + ("host" -> host)
+
+ val items = config.content.map( wanted => {
+ val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name)
+ val value = map.get(wanted.name).get.toString()
+
+ new TimeseriesDatapoint(name, value, timestamp, keys)
+ })
+
+ items.toSet
+ }
+} \ No newline at end of file
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
new file mode 100644
index 0000000..5eaf8f3
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2018 Cisco Systems. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cisco.ztt.ves.telemetry
+
+import com.cisco.ztt.Transformer
+import com.cisco.pnda.model.DataPlatformEvent
+import com.cisco.ztt.meta.Unit
+import com.cisco.ztt.Payload
+import org.apache.log4j.Logger
+import org.json4s.jackson.JsonMethods
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.JObject
+import com.cisco.ztt.TimeseriesDatapoint
+import org.json4s.JsonAST.JArray
+
+
+class VesTransformer(unit: Unit) extends Serializable with Transformer {
+
+ object Holder extends Serializable {
+ @transient lazy val logger = Logger.getLogger(getClass.getName)
+ }
+
+ def inputTopic: String = { unit.input_topic }
+
+ val mappers = unit.ves_telemetry.map(d => {
+ d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace)
+ }) //.toMap
+
+ def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = {
+ val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src }
+ val parsed = JsonMethods.parse(rawEvent.getRawdata)
+
+ if (! parsed.isInstanceOf[JObject]) {
+ Holder.logger.warn("Cannot process parsed JSON")
+ return (false, Set[Payload]())
+ }
+ val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]]
+ val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]]
+ val host = header.get("reportingEntityName").get.toString
+ val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3)
+
+ val generated = mappers.flatMap(r => {
+ val path = r._1
+ val mapper = r._2
+
+ val datapoints = visit(path, value, mapper, host, timestamp)
+ datapoints.map(d => {
+ new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d)
+ })
+ }).toSet
+ (true, generated)
+ }
+
+ def visit(
+ path: Array[String],
+ map: Map[String, Any],
+ mapper: VesMapper,
+ host: String,
+ timestamp: String): Set[TimeseriesDatapoint] = {
+ if (path.length > 0) {
+ val option = map.get(path.head)
+ option match {
+ case None => {
+ Holder.logger.warn("VES mapper failed to dereference JSON " + path.head)
+ return Set[TimeseriesDatapoint]()
+ }
+
+ case _ => {
+ option.get match {
+ case l: List[_] => {
+ val list = l.asInstanceOf[List[Map[String, Any]]]
+ return list.flatMap(sub => {
+ visit(path.tail, sub, mapper, host, timestamp)
+ }).toSet
+ }
+ case m: Map[_, _] => {
+ val sub = m.asInstanceOf[Map[String, Any]]
+ return visit(path.tail, sub, mapper, host, timestamp)
+ }
+
+ }
+ }
+ }
+ } else {
+ val datapoints = mapper.transform(map, host, timestamp)
+ return datapoints
+ }
+
+ Set[TimeseriesDatapoint]()
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
new file mode 100644
index 0000000..57339f8
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[DStream]]s to Kafka
+ * @param dStream [[DStream]] to be written to Kafka
+ */
+class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[DStream]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ dStream.foreachRDD { rdd =>
+ val rddWriter = new RDDKafkaWriter[T](rdd)
+ rddWriter.writeToKafka(producerConfig, transformFunc, callback)
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
new file mode 100644
index 0000000..2fbf6bc
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import java.util.concurrent.{Callable, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+/** Cache of [[KafkaProducer]]s */
+object KafkaProducerCache {
+ private type ProducerConf = Seq[(String, Object)]
+ private type ExProducer = KafkaProducer[_, _]
+
+ private val removalListener = new RemovalListener[ProducerConf, ExProducer]() {
+ override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit =
+ notif.getValue.close()
+ }
+
+ private val cacheExpireTimeout = 10.minutes.toMillis
+ private val cache = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
+ .removalListener(removalListener)
+ .build[ProducerConf, ExProducer]()
+
+ /**
+ * Retrieve a [[KafkaProducer]] in the cache or create a new one
+ * @param producerConfig producer configuration for creating [[KafkaProducer]]
+ * @return a [[KafkaProducer]] already in the cache or a new one
+ */
+ def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] =
+ try {
+ cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] {
+ override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava)
+ }).asInstanceOf[KafkaProducer[K, V]]
+ } catch {
+ case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
+ if e.getCause != null => throw e.getCause
+ }
+
+ /**
+ * Flush and close the [[KafkaProducer]] in the cache associated with this config
+ * @param producerConfig producer configuration associated to a [[KafkaProducer]]
+ */
+ def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig))
+
+ private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1)
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
new file mode 100644
index 0000000..90d2bb1
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used to write DStreams, RDDs and Datasets to Kafka
+ *
+ * Example usage:
+ * {{{
+ * import com.github.benfradet.spark.kafka.writer.KafkaWriter._
+ * import org.apache.kafka.common.serialization.StringSerializer
+ *
+ * val topic = "my-topic"
+ * val producerConfig = Map(
+ * "bootstrap.servers" -> "127.0.0.1:9092",
+ * "key.serializer" -> classOf[StringSerializer].getName,
+ * "value.serializer" -> classOf[StringSerializer].getName
+ * )
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s)
+ * )
+ *
+ * val rdd: RDD[String] = ...
+ * rdd.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](localTopic, s)
+ * )
+ *
+ * val dataset: Dataset[Foo] = ...
+ * dataset.writeToKafka(
+ * producerConfig,
+ * f => new ProducerRecord[String, String](localTopic, f.toString)
+ * )
+ *
+ * val dataFrame: DataFrame = ...
+ * dataFrame.writeToKafka(
+ * producerConfig,
+ * r => new ProducerRecord[String, String](localTopic, r.get(0).toString)
+ * )
+ * }}}
+ * It is also possible to provide a callback for each write to Kafka.
+ *
+ * This is optional and has a value of None by default.
+ *
+ * Example Usage:
+ * {{{
+ * @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")
+ *
+ * val dStream: DStream[String] = ...
+ * dStream.writeToKafka(
+ * producerConfig,
+ * s => new ProducerRecord[String, String](topic, s),
+ * Some(new Callback with Serializable {
+ * override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
+ * if (Option(e).isDefined) {
+ * log.warn("error sending message", e)
+ * } else {
+ * log.info(s"write succeeded, offset: ${metadata.offset()")
+ * }
+ * }
+ * })
+ * )
+ * }}}
+ */
+abstract class KafkaWriter[T: ClassTag] extends Serializable {
+ /**
+ * Write a DStream, RDD, or Dataset to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
new file mode 100644
index 0000000..c6f6133
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka.writer
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * Class used for writing [[RDD]]s to Kafka
+ * @param rdd [[RDD]] to be written to Kafka
+ */
+class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T])
+ extends KafkaWriter[T] with Serializable {
+ /**
+ * Write a [[RDD]] to Kafka
+ * @param producerConfig producer configuration for creating KafkaProducer
+ * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s
+ * @param callback an optional [[Callback]] to be called after each write, default value is None.
+ */
+ override def writeToKafka[K, V](
+ producerConfig: Map[String, Object],
+ transformFunc: T => ProducerRecord[K, V],
+ callback: Option[Callback] = None
+ ): Unit =
+ rdd.foreachPartition { partition =>
+ val producer = KafkaProducerCache.getProducer[K, V](producerConfig)
+ partition
+ .map(transformFunc)
+ .foreach(record => producer.send(record, callback.orNull))
+ }
+}
diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
new file mode 100644
index 0000000..04f5ba7
--- /dev/null
+++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.benfradet.spark.kafka
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/** Implicit conversions between
+ * - [[DStream]] -> [[KafkaWriter]]
+ * - [[RDD]] -> [[KafkaWriter]]
+ * - [[Dataset]] -> [[KafkaWriter]]
+ * - [[DataFrame]] -> [[KafkaWriter]]
+ */
+package object writer {
+ /**
+ * Convert a [[DStream]] to a [[KafkaWriter]] implicitly
+ * @param dStream [[DStream]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] =
+ new DStreamKafkaWriter[T](dStream)
+
+ /**
+ * Convert a [[RDD]] to a [[KafkaWriter]] implicitly
+ * @param rdd [[RDD]] to be converted
+ * @return [[KafkaWriter]] ready to write messages to Kafka
+ */
+ implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] =
+ new RDDKafkaWriter[T](rdd)
+
+}