summaryrefslogtreecommitdiffstats
path: root/pnda-ztt-app/src/main/scala/com/cisco/pnda/StatReporter.scala
blob: b143b62df6fb7c2e7076fcdd347926b55ff012a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
 * Copyright (c) 2018 Cisco Systems. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/**
  * Name:       StatReporter
  * Purpose:    Report batch processing metrics to the PNDA metric logger
  * Author:     PNDA team
  *
  * Created:    07/04/2016
  */

/*
Copyright (c) 2016 Cisco and/or its affiliates.

This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
and/or its affiliated entities, under various laws including copyright, international treaties, patent,
and/or contract. Any use of the material herein must be in accordance with the terms of the License.
All rights not expressly granted by the License are reserved.

Unless required by applicable law or agreed to separately in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
*/

package com.cisco.pnda

import scala.util.control.NonFatal
import java.io.StringWriter
import java.io.PrintWriter
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
import org.apache.log4j.Logger
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient

class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {

    private[this] val logger = Logger.getLogger(getClass().getName())

    override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
        def doSend(metricName: String, metricValue: String) = {
            try {
                val httpClient = new DefaultHttpClient()
                val post = new HttpPost(metricsUrl)
                post.setHeader("Content-type", "application/json")
                val ts = java.lang.System.currentTimeMillis()
                val body = f"""{
                    |    "data": [{
                    |        "source": "application.$appName",
                    |        "metric": "application.kpi.$appName.$metricName",
                    |        "value": "$metricValue",
                    |        "timestamp": $ts%d
                    |    }],
                    |    "timestamp": $ts%d
                    |}""".stripMargin

                logger.debug(body)
                post.setEntity(new StringEntity(body))
                val response = httpClient.execute(post)
                if (response.getStatusLine.getStatusCode() != 200) {
                    logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
                }

            } catch {
                case NonFatal(t) => {
                    logger.error("POST failed: " + metricsUrl)
                    val sw = new StringWriter
                    t.printStackTrace(new PrintWriter(sw))
                    logger.error(sw.toString)
                }
            }
        }
        doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
        doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
        doSend("num-records", batch.batchInfo.numRecords.toString())
    }
}