diff options
Diffstat (limited to 'pnda-ztt-app/src/main/scala/com')
29 files changed, 1827 insertions, 0 deletions
diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala new file mode 100644 index 0000000..b143b62 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: StatReporter + * Purpose: Report batch processing metrics to the PNDA metric logger + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.pnda + +import scala.util.control.NonFatal +import java.io.StringWriter +import java.io.PrintWriter +import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import org.apache.log4j.Logger +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.DefaultHttpClient + +class StatReporter(appName: String, metricsUrl: String) extends StreamingListener { + + private[this] val logger = Logger.getLogger(getClass().getName()) + + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = { + def doSend(metricName: String, metricValue: String) = { + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(metricsUrl) + post.setHeader("Content-type", "application/json") + val ts = java.lang.System.currentTimeMillis() + val body = f"""{ + | "data": [{ + | "source": "application.$appName", + | "metric": "application.kpi.$appName.$metricName", + | "value": "$metricValue", + | "timestamp": $ts%d + | }], + | "timestamp": $ts%d + |}""".stripMargin + + logger.debug(body) + post.setEntity(new StringEntity(body)) + val response = httpClient.execute(post) + if (response.getStatusLine.getStatusCode() != 200) { + logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode()) + } + + } catch { + case NonFatal(t) => { + logger.error("POST failed: " + metricsUrl) + val sw = new StringWriter + t.printStackTrace(new PrintWriter(sw)) + logger.error(sw.toString) + } + } + } + doSend("processing-delay", batch.batchInfo.processingDelay.get.toString()) + doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString()) + doSend("num-records", batch.batchInfo.numRecords.toString()) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java new file mode 100644 index 0000000..1a08bd4 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEvent.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: DataPlatformEvent + * Purpose: Data model class for an avro event on Kafka + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.IOException; +import java.io.Serializable; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.ObjectMapper; + +public class DataPlatformEvent implements Serializable +{ + private static final long serialVersionUID = 1L; + protected static ObjectMapper _mapper = new ObjectMapper(); + + private String _src; + private Long _timestamp; + private String _hostIp; + private String _rawdata; + + public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata) + { + _src = src; + _timestamp = timestamp; + _hostIp = host_ip; + _rawdata = rawdata; + } + + public String getSrc() + { + return _src; + } + + public Long getTimestamp() + { + return _timestamp; + } + + public String getHostIp() + { + return _hostIp; + } + + public String getRawdata() + { + return _rawdata; + } + + @Override + public String toString() + { + try + { + return _mapper.writeValueAsString(this); + } + catch (Exception ex) + { + return null; + } + } + + @Override + public boolean equals(Object other) + { + boolean result = false; + if (other instanceof DataPlatformEvent) + { + DataPlatformEvent that = (DataPlatformEvent) other; + result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc()))) + && (this.getTimestamp() == that.getTimestamp() || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp()))) + && (this.getHostIp() == that.getHostIp() || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp()))) + && (this.getRawdata() == that.getRawdata() || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata()))); + } + return result; + + } + + public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException + { + return _mapper.readTree(_rawdata); + } + +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java new file mode 100644 index 0000000..ef2bc0d --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/DataPlatformEventCodec.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: DataPlatformEventDecoder + * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.log4j.Logger; + + +public class DataPlatformEventCodec +{ + private static final Logger LOGGER = Logger.getLogger(DataPlatformEventCodec.class.getName()); + + private Schema.Parser _parser = new Schema.Parser(); + private DatumReader<GenericRecord> _reader; + private DatumWriter<GenericRecord> _writer; + private Schema _schema; + private String _schemaDef; + + public DataPlatformEventCodec(String schemaDef) throws IOException + { + _schemaDef = schemaDef; + _schema = _parser.parse(schemaDef); + _reader = new GenericDatumReader<GenericRecord>(_schema); + _writer = new GenericDatumWriter<GenericRecord>(_schema); + } + + public DataPlatformEvent decode(byte[] data) throws IOException + { + try + { + Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); + GenericRecord r = _reader.read(null, decoder); + return new DataPlatformEvent((String) r.get("src").toString(), + (Long) r.get("timestamp"), + (String) r.get("host_ip").toString(), + new String(((ByteBuffer)r.get("rawdata")).array())); + } + catch(Exception ex) + { + LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex); + throw new IOException(ex); + } + } + + final protected static char[] hexArray = "0123456789ABCDEF".toCharArray(); + public static String hexStr(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for ( int j = 0; j < bytes.length; j++ ) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + public byte[] encode(DataPlatformEvent e) throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericRecord datum = new GenericData.Record(_schema); + datum.put("src", e.getSrc()); + datum.put("timestamp", e.getTimestamp()); + datum.put("host_ip", e.getHostIp()); + datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8"))); + _writer.write(datum, encoder); + encoder.flush(); + out.close(); + return out.toByteArray(); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java new file mode 100644 index 0000000..24a9d35 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/pnda/model/StaticHelpers.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: StaticHelpers + * Purpose: Helper functions + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.pnda.model; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +public class StaticHelpers { + public static String loadResourceFile(String path) + { + InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path); + try + { + char[] buf = new char[2048]; + Reader r = new InputStreamReader(is, "UTF-8"); + StringBuilder s = new StringBuilder(); + while (true) + { + int n = r.read(buf); + if (n < 0) + break; + s.append(buf, 0, n); + } + return s.toString(); + } + catch (Exception e) + { + return null; + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala new file mode 100644 index 0000000..6bc2083 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/App.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +import org.apache.spark.streaming.StreamingContext +import com.cisco.pnda.StatReporter +import org.apache.log4j.Logger +import com.cisco.ztt.meta.YamlReader +import org.apache.log4j.BasicConfigurator + +object App { + + private[this] val logger = Logger.getLogger(getClass().getName()) + + def main(args: Array[String]) { + + BasicConfigurator.configure(); + + val props = AppConfig.loadProperties(); + val loggerUrl = props.getProperty("environment.metric_logger_url") + val appName = props.getProperty("component.application") + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val metadata = YamlReader.load() + if (metadata.units.length == 0) { + logger.error("Trying to run app without metadata") + System.exit(1) + } + val pipeline = new ZttPipeline(metadata) + + // Create the streaming context, or load a saved one from disk + val ssc = if (checkpointDirectory.length() > 0) + StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create(); + + sys.ShutdownHookThread { + logger.info("Gracefully stopping Spark Streaming Application") + ssc.stop(true, true) + logger.info("Application stopped") + } + + if (loggerUrl != null) { + logger.info("Reporting stats to url: " + loggerUrl) + ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) + } + logger.info("Starting spark streaming execution") + ssc.start() + ssc.awaitTermination() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java new file mode 100644 index 0000000..a711cab --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/AppConfig.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Name: AppConfig + * Purpose: Load application properties file. + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.cisco.ztt; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.log4j.Logger; + +public class AppConfig +{ + private static final Logger LOGGER = Logger.getLogger(AppConfig.class); + + private static Properties _properties; + private static Object _lock = new Object(); + + public static Properties loadProperties() + { + synchronized (_lock) + { + if (_properties == null) + { + _properties = new Properties(); + try + { + InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties"); + _properties.load(is); + is.close(); + LOGGER.info("Properties loaded"); + } + catch (IOException e) + { + LOGGER.info("Failed to load properties", e); + System.exit(1); + } + } + return _properties; + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala new file mode 100644 index 0000000..c0bc61e --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaInput.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: KafkaInput + * Purpose: Generate a dstream from Kafka + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.kafka.KafkaUtils + +import com.cisco.pnda.model.DataPlatformEventCodec +import com.cisco.pnda.model.StaticHelpers + +import kafka.serializer.DefaultDecoder +import kafka.serializer.StringDecoder +import org.apache.log4j.Logger + +class KafkaInput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def readFromKafka(ssc: StreamingContext, topic: String) = { + val props = AppConfig.loadProperties(); + val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + if (props.getProperty("kafka.consume_from_beginning").toBoolean) { + kafkaParams.put("auto.offset.reset", "smallest"); + } + + Holder.logger.info("Registering with kafka using broker " + kafkaParams("metadata.broker.list")) + Holder.logger.info("Registering with kafka using topic " + topic) + + val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( + ssc, kafkaParams.toMap, Set(topic)).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + + // Decode avro container format + val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); + val rawMessages = messages.map(x => { + val eventDecoder = new DataPlatformEventCodec(avroSchemaString); + val payload = x._2; + val dataPlatformEvent = eventDecoder.decode(payload); + dataPlatformEvent; + }); + rawMessages; + }; +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala new file mode 100644 index 0000000..1041d00 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/KafkaOutput.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + + + + + +import java.io.StringWriter + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream + +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.pnda.model.DataPlatformEventCodec +import com.cisco.pnda.model.StaticHelpers +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.github.benfradet.spark.kafka.writer.dStreamToKafkaWriter + + +class KafkaOutput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def writeToKafka(output: DStream[Payload]) = { + + val props = AppConfig.loadProperties(); + val producerConfig = Map( + "bootstrap.servers" -> props.getProperty("kafka.brokers"), + "key.serializer" -> classOf[StringSerializer].getName, + "value.serializer" -> classOf[ByteArraySerializer].getName +) + output.writeToKafka( + producerConfig, + s => { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + val out = new StringWriter + mapper.writeValue(out, s.datapoint) + val json = out.toString() + + val event = new DataPlatformEvent(s.publishSrc, s.timestamp, s.hostIp, json) + val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); + val codec = new DataPlatformEventCodec(avroSchemaString); + + new ProducerRecord[String, Array[Byte]](s.publishTopic, codec.encode(event)) + } + ) + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala new file mode 100644 index 0000000..9077986 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/OpenTSDBOutput.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: OpenTSDBOutput + * Purpose: Write a dstream to OpenTSDB + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import java.io.StringWriter + +import scala.Iterator + +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.util.EntityUtils +import org.apache.http.impl.client.DefaultHttpClient +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import scala.collection.mutable.ArrayBuffer + +class OpenTSDBOutput extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def putOpentsdb[T](opentsdbIP: String, + input: DStream[Payload]): DStream[Payload] = { + input.mapPartitions(partition => { + var size = 0 + val output = partition.grouped(20).flatMap(group => { + val data = ArrayBuffer[TimeseriesDatapoint]() + val passthru = group.map(item => { + data += item.datapoint + item + }) + + size += data.length + + if (data.length > 0) { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + val out = new StringWriter + mapper.writeValue(out, data) + val json = out.toString() + + Holder.logger.debug("Posting " + data.length + " datapoints to OpenTSDB") + Holder.logger.debug(json) + + if (opentsdbIP != null && opentsdbIP.length() > 0) { + val openTSDBUrl = "http://" + opentsdbIP + "/api/put" + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(openTSDBUrl) + post.setHeader("Content-type", "application/json") + post.setEntity(new StringEntity(json)) + val response = httpClient.execute(post) + // Holder.logger.debug(EntityUtils.toString(response.getEntity())) + } catch { + case t: Throwable => { + Holder.logger.warn(t) + } + } + } + } else { + Holder.logger.debug("No datapoints to post to OpenTSDB") + } + passthru + }) + output + }); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala new file mode 100644 index 0000000..2961bd0 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Payload.scala @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +class Payload( + val publishSrc: String, + val publishTopic: String, + val hostIp: String, + val timestamp: Long, + val datapoint: TimeseriesDatapoint) { + +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala new file mode 100644 index 0000000..7e447bf --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TimeseriesDatapoint.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +class TimeseriesDatapoint( + val metric: String, + val value: String, + val timestamp: String, + val tags: Map[String, String]) { + +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala new file mode 100644 index 0000000..aae1e8c --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/TransformManager.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + + +import com.cisco.ztt.meta.Metadata +import com.cisco.ztt.cisco.xr.telemetry.TelemetryDispatcher +import com.cisco.ztt.ves.telemetry.VesTransformer + +class TransformManager(metadata: Metadata) { + + val transformers: Array[Transformer] = metadata.units.map( unit => { + + unit.format match { + case "ves" => new VesTransformer(unit) + case "cisco.xr.telemetry" => new TelemetryDispatcher(unit) + case _ => new TelemetryDispatcher(unit) + } + }) + + val byTopic = transformers.groupBy( t => { t.inputTopic }) +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala new file mode 100644 index 0000000..861eb5e --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/Transformer.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt + +import com.cisco.pnda.model.DataPlatformEvent +import org.apache.spark.streaming.dstream.DStream + +trait Transformer { + def inputTopic: String + def transform(event: DataPlatformEvent): (Boolean, Set[Payload]) +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala new file mode 100644 index 0000000..c3b3532 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ZttPipeline.scala @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Name: KafkaPipeline + * Purpose: Set up the spark streaming processing graph. + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +/* +Copyright (c) 2016 Cisco and/or its affiliates. + +This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License"). +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc. +and/or its affiliated entities, under various laws including copyright, international treaties, patent, +and/or contract. Any use of the material herein must be in accordance with the terms of the License. +All rights not expressly granted by the License are reserved. + +Unless required by applicable law or agreed to separately in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. +*/ + +package com.cisco.ztt + +import org.apache.log4j.Logger +import org.apache.spark.SparkConf +import org.apache.spark.streaming.Seconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream +import com.cisco.ztt.meta.Metadata +import com.cisco.pnda.model.DataPlatformEvent + +class ZttPipeline(metadata: Metadata) extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def create() = { + val props = AppConfig.loadProperties(); + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val sparkConf = new SparkConf(); + Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory) + val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds)); + + if (checkpointDirectory.length() > 0) { + ssc.checkpoint(checkpointDirectory) + } + + val transformManager = new TransformManager(metadata) + val streams = transformManager.byTopic.map( x => { + val topic = x._1 + val transformers = x._2 + + val inputStream = new KafkaInput().readFromKafka(ssc, topic) + + val timeseriesStream = inputStream.flatMap(dataPlatformEvent => { + var handled = false; + val datapoints = transformers.flatMap(transformer => { + val (ran, data) = transformer.transform(dataPlatformEvent) + handled |= ran; + data; + }) + if (!handled) { + Holder.logger.info("Did not process " + dataPlatformEvent.getRawdata) + } + datapoints + }) + + val outputStream = + new OpenTSDBOutput().putOpentsdb( + props.getProperty("opentsdb.ip"), + timeseriesStream); + + new KafkaOutput().writeToKafka(outputStream) + }); + + ssc; + }: StreamingContext +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala new file mode 100644 index 0000000..00470ff --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/InventoryMapper.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint +import org.apache.log4j.Logger +import com.cisco.ztt.meta.Telemetry + +class InventoryMapper(config: Telemetry) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + Holder.logger.debug("InventoryMapper is sinking " + config.path) + Set[TimeseriesDatapoint]() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala new file mode 100644 index 0000000..97f4da3 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/JsonParser.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JArray +import org.json4s.JsonAST.JInt +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods +import org.json4s.jvalue2extractable +import org.json4s.string2JsonInput +import scala.reflect.ManifestFactory.arrayType +import scala.reflect.ManifestFactory.classType +import org.json4s.JsonAST.JObject + + +case class TRow(Timestamp: String, Keys: Option[Map[String, String]], Content: Map[String, JValue]) +case class THeader(node_id_str: String, subscription_id_str: String, + encoding_path: String, collection_id: Int, collection_start_time: Int, + msg_timestamp: Int, collection_end_time: Int) +case class TEvent(Source: String, Telemetry: THeader, Rows: Array[TRow]) + + +object JsonParser { + + def parse(json: String): TEvent = { + implicit val formats = DefaultFormats + + val parsed = JsonMethods.parse(json) + val event = parsed.extract[TEvent] + event + } + + def array(value: JValue): Array[Map[String,JValue]] = { + implicit val formats = DefaultFormats + val array = value.asInstanceOf[JArray] + array.extract[Array[Map[String, JValue]]] + } + + def map(value: JValue): Map[String,JValue] = { + implicit val formats = DefaultFormats + val map = value.asInstanceOf[JObject] + map.extract[Map[String, JValue]] + } + + def int(value: JValue): String = { + value.asInstanceOf[JInt].num.toString + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala new file mode 100644 index 0000000..2983e88 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/Mapper.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint + +trait Mapper { + def transform(event: TEvent): Set[TimeseriesDatapoint] +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala new file mode 100644 index 0000000..9f9c775 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/NullMapper.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import com.cisco.ztt.TimeseriesDatapoint +import org.apache.log4j.Logger + + +class NullMapper(path: String) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + Holder.logger.debug("NullMapper is sinking " + path) + Set[TimeseriesDatapoint]() + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala new file mode 100644 index 0000000..b7deadc --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TelemetryDispatcher.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.apache.log4j.Logger + +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.ztt.TimeseriesDatapoint +import com.cisco.ztt.Transformer +import com.cisco.ztt.meta.Unit +import com.cisco.ztt.Payload + + +class TelemetryDispatcher(unit: Unit) extends Serializable with Transformer { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def inputTopic: String = { unit.input_topic } + + val transformers = unit.xr_telemetry.map( d => { + Holder.logger.warn("Choosing mapper for " + unit.processor) + unit.processor match { + case "timeseries" => d.path -> new TimeseriesMapper(d, unit.timeseries_namespace) + case "inventory" => d.path -> new InventoryMapper(d) + case _ => d.path -> new NullMapper(d.path) + } + }).toMap + + def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = { + Holder.logger.trace(rawEvent.getRawdata()) + + try { + val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src } + val event = JsonParser.parse(rawEvent.getRawdata()) + val path = event.Telemetry.encoding_path + if (transformers.contains(path)) { + Holder.logger.debug("Transforming for " + path) + val datapoints = transformers(path).transform(event) + val payloads = datapoints.map(d => { + new Payload(source, unit.output_topic, + rawEvent.getHostIp, rawEvent.getTimestamp, d) + }) + (true, payloads) + } else { + Holder.logger.trace("No transformer in unit for " + path) + (false, Set[Payload]()) + } + } catch { + case t: Throwable => { + Holder.logger.error("Failed to parse JSON: " + t.getLocalizedMessage) + (false, Set[Payload]()) + } + } + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala new file mode 100644 index 0000000..6c9ad80 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/cisco/xr/telemetry/TimeseriesMapper.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.cisco.xr.telemetry + +import org.apache.log4j.Logger +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JObject +import org.json4s.JsonAST.JValue +import com.cisco.ztt.meta.Item +import com.cisco.ztt.meta.Telemetry + +class TimeseriesMapper(config: Telemetry, namespace: String) extends Mapper with Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + val wantedKeys = config.keys.getOrElse(Array[Item]()).map( item => { + item.name -> item + }).toMap + + val wantedValues = config.content.map( item => { + item.name -> item + }).toMap + + def transform(event: TEvent): Set[TimeseriesDatapoint] = { + + val timeseries = event.Rows.flatMap( row => { + val keys = row.Keys + .getOrElse(Map[String, String]()) + .filter(k => wantedKeys.contains(k._1)) + .map( k => { + val wanted = wantedKeys(k._1) + val name = if (wanted.ts_name != null) wanted.ts_name else k._1 + name -> k._2 + }).toMap + ("host" -> event.Telemetry.node_id_str) + + // Flatten nested maps into container.key -> value + val expanded = row.Content.flatMap( v => { + if (v._2.isInstanceOf[JObject]) { + JsonParser.map(v._2).map( kv => { + (v._1 + "." + kv._1) -> kv._2 + }) + } else { + Map[String, JValue](v) + } + }) + + val items = expanded.filter(v => wantedValues.contains(v._1)) + .map( data => { + val wanted = wantedValues(data._1) + val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else data._1) + val value = JsonParser.int(data._2) + val datapoint = new TimeseriesDatapoint(name, value, row.Timestamp, keys) + datapoint + }) + items.toSet + }) + + timeseries.toSet + } + +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala new file mode 100644 index 0000000..84f5bbb --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/Metadata.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +class Metadata(val units: Array[Unit]) extends Serializable { + def topics() : Set[String] = { + val topics = units.map { case (unit : Unit) => { + unit.input_topic + }} + + topics.toSet + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala new file mode 100644 index 0000000..4b72cb1 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/meta/YamlReader.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.meta + +import org.apache.log4j.Logger +import org.springframework.core.io.support.PathMatchingResourcePatternResolver + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +case class Unit( + format: String, + input_topic: String, + processor: String, + output_topic: String, + publish_src: String, + timeseries_namespace: String, + xr_telemetry: Array[Telemetry], + ves_telemetry: Array[Telemetry]) +case class Telemetry(path: String, keys: Option[Array[Item]], content: Array[Item]) +case class Item(name: String, display_name: String, ts_name: String) + +object YamlReader { + + private[this] val logger = Logger.getLogger(getClass().getName()); + + val mapper: ObjectMapper = new ObjectMapper(new YAMLFactory()) + mapper.registerModule(DefaultScalaModule) + + def load(pattern: String = "classpath*:meta/*.yaml"): Metadata = { + val patternResolver = new PathMatchingResourcePatternResolver() + val mappingLocations = patternResolver.getResources(pattern) + val units = mappingLocations.map(loc => { + logger.info("Reading metadata " + loc) + mapper.readValue(loc.getInputStream, classOf[Unit]) + }); + new Metadata(units) + } + + def parse(yaml: String): Unit = { + val meta: Unit = mapper.readValue(yaml, classOf[Unit]) + meta + } + + def dump(meta: Unit) = { + println(" input = " + meta.input_topic) + println("output = " + meta.output_topic) + meta.xr_telemetry.map(m => { + println(" path = " + m.path) + println("keys:") + m.keys.getOrElse(Array[Item]()).map(item => { + println(" " + item.name + " -> " + item.display_name) + }); + println("content:") + m.content.map(item => { + println(" " + item.name + " -> " + item.display_name) + }); + }); + } +} diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala new file mode 100644 index 0000000..4019262 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesMapper.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.ves.telemetry + +import com.cisco.ztt.meta.Telemetry +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JValue +import org.apache.log4j.Logger +import com.cisco.ztt.meta.Item +import org.json4s.JsonAST.JObject + + +class VesMapper(config: Telemetry, namespace: String) extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def transform(map: Map[String,Any], host: String, timestamp: String): Set[TimeseriesDatapoint] = { + + val keys = config.keys.getOrElse(Array[Item]()).map( k => { + val value = map.get(k.name).get.toString() + k.name -> value + }).toMap + ("host" -> host) + + val items = config.content.map( wanted => { + val name = namespace + "." + (if (wanted.ts_name != null) wanted.ts_name else wanted.name) + val value = map.get(wanted.name).get.toString() + + new TimeseriesDatapoint(name, value, timestamp, keys) + }) + + items.toSet + } +}
\ No newline at end of file diff --git a/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala new file mode 100644 index 0000000..5eaf8f3 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/cisco/ztt/ves/telemetry/VesTransformer.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2018 Cisco Systems. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cisco.ztt.ves.telemetry + +import com.cisco.ztt.Transformer +import com.cisco.pnda.model.DataPlatformEvent +import com.cisco.ztt.meta.Unit +import com.cisco.ztt.Payload +import org.apache.log4j.Logger +import org.json4s.jackson.JsonMethods +import org.json4s.JsonAST.JValue +import org.json4s.JsonAST.JObject +import com.cisco.ztt.TimeseriesDatapoint +import org.json4s.JsonAST.JArray + + +class VesTransformer(unit: Unit) extends Serializable with Transformer { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def inputTopic: String = { unit.input_topic } + + val mappers = unit.ves_telemetry.map(d => { + d.path.split("/") -> new VesMapper(d, unit.timeseries_namespace) + }) //.toMap + + def transform(rawEvent: DataPlatformEvent): (Boolean, Set[Payload]) = { + val source = if (unit.publish_src == null) { "timeseries" } else { unit.publish_src } + val parsed = JsonMethods.parse(rawEvent.getRawdata) + + if (! parsed.isInstanceOf[JObject]) { + Holder.logger.warn("Cannot process parsed JSON") + return (false, Set[Payload]()) + } + val value = parsed.asInstanceOf[JObject].values.get("event").get.asInstanceOf[Map[String, JValue]] + val header = value.get("commonEventHeader").get.asInstanceOf[Map[String,Any]] + val host = header.get("reportingEntityName").get.toString + val timestamp = header.get("lastEpochMicrosec").get.toString.dropRight(3) + + val generated = mappers.flatMap(r => { + val path = r._1 + val mapper = r._2 + + val datapoints = visit(path, value, mapper, host, timestamp) + datapoints.map(d => { + new Payload(source, unit.output_topic, rawEvent.getHostIp, rawEvent.getTimestamp, d) + }) + }).toSet + (true, generated) + } + + def visit( + path: Array[String], + map: Map[String, Any], + mapper: VesMapper, + host: String, + timestamp: String): Set[TimeseriesDatapoint] = { + if (path.length > 0) { + val option = map.get(path.head) + option match { + case None => { + Holder.logger.warn("VES mapper failed to dereference JSON " + path.head) + return Set[TimeseriesDatapoint]() + } + + case _ => { + option.get match { + case l: List[_] => { + val list = l.asInstanceOf[List[Map[String, Any]]] + return list.flatMap(sub => { + visit(path.tail, sub, mapper, host, timestamp) + }).toSet + } + case m: Map[_, _] => { + val sub = m.asInstanceOf[Map[String, Any]] + return visit(path.tail, sub, mapper, host, timestamp) + } + + } + } + } + } else { + val datapoints = mapper.transform(map, host, timestamp) + return datapoints + } + + Set[TimeseriesDatapoint]() + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala new file mode 100644 index 0000000..57339f8 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/DStreamKafkaWriter.scala @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Class used for writing [[DStream]]s to Kafka + * @param dStream [[DStream]] to be written to Kafka + */ +class DStreamKafkaWriter[T: ClassTag](@transient private val dStream: DStream[T]) + extends KafkaWriter[T] with Serializable { + /** + * Write a [[DStream]] to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + override def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit = + dStream.foreachRDD { rdd => + val rddWriter = new RDDKafkaWriter[T](rdd) + rddWriter.writeToKafka(producerConfig, transformFunc, callback) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala new file mode 100644 index 0000000..2fbf6bc --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaProducerCache.scala @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import java.util.concurrent.{Callable, ExecutionException, TimeUnit} + +import com.google.common.cache._ +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.kafka.clients.producer.KafkaProducer + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +/** Cache of [[KafkaProducer]]s */ +object KafkaProducerCache { + private type ProducerConf = Seq[(String, Object)] + private type ExProducer = KafkaProducer[_, _] + + private val removalListener = new RemovalListener[ProducerConf, ExProducer]() { + override def onRemoval(notif: RemovalNotification[ProducerConf, ExProducer]): Unit = + notif.getValue.close() + } + + private val cacheExpireTimeout = 10.minutes.toMillis + private val cache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) + .removalListener(removalListener) + .build[ProducerConf, ExProducer]() + + /** + * Retrieve a [[KafkaProducer]] in the cache or create a new one + * @param producerConfig producer configuration for creating [[KafkaProducer]] + * @return a [[KafkaProducer]] already in the cache or a new one + */ + def getProducer[K, V](producerConfig: Map[String, Object]): KafkaProducer[K, V] = + try { + cache.get(mapToSeq(producerConfig), new Callable[KafkaProducer[K, V]] { + override def call(): KafkaProducer[K, V] = new KafkaProducer[K, V](producerConfig.asJava) + }).asInstanceOf[KafkaProducer[K, V]] + } catch { + case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) + if e.getCause != null => throw e.getCause + } + + /** + * Flush and close the [[KafkaProducer]] in the cache associated with this config + * @param producerConfig producer configuration associated to a [[KafkaProducer]] + */ + def close(producerConfig: Map[String, Object]): Unit = cache.invalidate(mapToSeq(producerConfig)) + + private def mapToSeq(m: Map[String, Object]): Seq[(String, Object)] = m.toSeq.sortBy(_._1) +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala new file mode 100644 index 0000000..90d2bb1 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/KafkaWriter.scala @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} + +import scala.reflect.ClassTag + +/** + * Class used to write DStreams, RDDs and Datasets to Kafka + * + * Example usage: + * {{{ + * import com.github.benfradet.spark.kafka.writer.KafkaWriter._ + * import org.apache.kafka.common.serialization.StringSerializer + * + * val topic = "my-topic" + * val producerConfig = Map( + * "bootstrap.servers" -> "127.0.0.1:9092", + * "key.serializer" -> classOf[StringSerializer].getName, + * "value.serializer" -> classOf[StringSerializer].getName + * ) + * + * val dStream: DStream[String] = ... + * dStream.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](topic, s) + * ) + * + * val rdd: RDD[String] = ... + * rdd.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](localTopic, s) + * ) + * + * val dataset: Dataset[Foo] = ... + * dataset.writeToKafka( + * producerConfig, + * f => new ProducerRecord[String, String](localTopic, f.toString) + * ) + * + * val dataFrame: DataFrame = ... + * dataFrame.writeToKafka( + * producerConfig, + * r => new ProducerRecord[String, String](localTopic, r.get(0).toString) + * ) + * }}} + * It is also possible to provide a callback for each write to Kafka. + * + * This is optional and has a value of None by default. + * + * Example Usage: + * {{{ + * @transient val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer") + * + * val dStream: DStream[String] = ... + * dStream.writeToKafka( + * producerConfig, + * s => new ProducerRecord[String, String](topic, s), + * Some(new Callback with Serializable { + * override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = { + * if (Option(e).isDefined) { + * log.warn("error sending message", e) + * } else { + * log.info(s"write succeeded, offset: ${metadata.offset()") + * } + * } + * }) + * ) + * }}} + */ +abstract class KafkaWriter[T: ClassTag] extends Serializable { + /** + * Write a DStream, RDD, or Dataset to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala new file mode 100644 index 0000000..c6f6133 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/RDDKafkaWriter.scala @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka.writer + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord} +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * Class used for writing [[RDD]]s to Kafka + * @param rdd [[RDD]] to be written to Kafka + */ +class RDDKafkaWriter[T: ClassTag](@transient private val rdd: RDD[T]) + extends KafkaWriter[T] with Serializable { + /** + * Write a [[RDD]] to Kafka + * @param producerConfig producer configuration for creating KafkaProducer + * @param transformFunc a function used to transform values of T type into [[ProducerRecord]]s + * @param callback an optional [[Callback]] to be called after each write, default value is None. + */ + override def writeToKafka[K, V]( + producerConfig: Map[String, Object], + transformFunc: T => ProducerRecord[K, V], + callback: Option[Callback] = None + ): Unit = + rdd.foreachPartition { partition => + val producer = KafkaProducerCache.getProducer[K, V](producerConfig) + partition + .map(transformFunc) + .foreach(record => producer.send(record, callback.orNull)) + } +} diff --git a/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala new file mode 100644 index 0000000..04f5ba7 --- /dev/null +++ b/pnda-ztt-app/src/main/scala/com/github/benfradet/spark/kafka/writer/package.scala @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-2017, Benjamin Fradet, and other contributors. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.github.benfradet.spark.kafka + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** Implicit conversions between + * - [[DStream]] -> [[KafkaWriter]] + * - [[RDD]] -> [[KafkaWriter]] + * - [[Dataset]] -> [[KafkaWriter]] + * - [[DataFrame]] -> [[KafkaWriter]] + */ +package object writer { + /** + * Convert a [[DStream]] to a [[KafkaWriter]] implicitly + * @param dStream [[DStream]] to be converted + * @return [[KafkaWriter]] ready to write messages to Kafka + */ + implicit def dStreamToKafkaWriter[T: ClassTag, K, V](dStream: DStream[T]): KafkaWriter[T] = + new DStreamKafkaWriter[T](dStream) + + /** + * Convert a [[RDD]] to a [[KafkaWriter]] implicitly + * @param rdd [[RDD]] to be converted + * @return [[KafkaWriter]] ready to write messages to Kafka + */ + implicit def rddToKafkaWriter[T: ClassTag, K, V](rdd: RDD[T]): KafkaWriter[T] = + new RDDKafkaWriter[T](rdd) + +} |