aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
blob: a949803fa26d698256ad1e3674178515aba1a94d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
 * ============LICENSE_START=======================================================
 * dcaegen2-collectors-veshv
 * ================================================================================
 * Copyright (C) 2018-2020 NOKIA
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */
package org.onap.dcae.collectors.veshv.main.metrics

import arrow.syntax.function.memoize
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
import java.time.Duration
import java.time.Instant


/**
 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 * @since June 2018
 */
class MicrometerMetrics internal constructor(
        private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
    private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
    private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED))
    private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES))

    private val totalConnections = registry.counter(name(CONNECTIONS))
    private val disconnections = registry.counter(name(DISCONNECTIONS))

    private val travelTimeToCollector = Timer.builder(name(MESSAGES, TO, COLLECTOR, TRAVEL, TIME))
            .maximumExpectedValue(MAX_BUCKET_DURATION)
            .publishPercentileHistogram(true)
            .register(registry)
    private val processingTime = Timer.builder(name(MESSAGES, PROCESSING, TIME))
            .maximumExpectedValue(MAX_BUCKET_DURATION)
            .publishPercentileHistogram(true)
            .register(registry)
    private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING))
            .maximumExpectedValue(MAX_BUCKET_DURATION)
            .publishPercentileHistogram(true)
            .register(registry)
    private val totalLatencyWithoutRouting = Timer.builder(name(MESSAGES, LATENCY, WITHOUT, ROUTING))
            .maximumExpectedValue(MAX_BUCKET_DURATION)
            .publishPercentileHistogram(true)
            .register(registry)
    private val totalLatency = Timer.builder(name(MESSAGES, LATENCY))
            .maximumExpectedValue(MAX_BUCKET_DURATION)
            .publishPercentileHistogram(true)
            .register(registry)

    private val sentMessages = registry.counter(name(MESSAGES, SENT))
    private val sentMessagesByTopic = { topic: String ->
        registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic)
    }.memoize<String, Counter>()
    private val droppedMessages = registry.counter(name(MESSAGES, DROPPED))
    private val messagesDroppedByCause = { cause: String ->
        registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause)
    }.memoize<String, Counter>()
    private val clientsRejected = registry.counter(name(CLIENTS, REJECTED))
    private val clientsRejectedByCause = { cause: String ->
        registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause)
    }.memoize<String, Counter>()

    init {

        registry.gauge(name(CONNECTIONS, ACTIVE), this) {
            (totalConnections.count() - disconnections.count()).coerceAtLeast(0.0)
        }

        ClassLoaderMetrics().bindTo(registry)
        JvmMemoryMetrics().bindTo(registry)
        JvmGcMetrics().bindTo(registry)
        ProcessorMetrics().bindTo(registry)
        JvmThreadMetrics().bindTo(registry)
    }

    val metricsProvider = MicrometerPrometheusMetricsProvider(registry)

    override fun notifyBytesReceived(size: Int) {
        receivedBytes.increment(size.toDouble())
    }

    override fun notifyMessageReadyForRouting(msg: VesMessage) {
        val now = Instant.now()
        processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, now))
        totalLatencyWithoutRouting.record(Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), now))
    }

    override fun notifyMessageReceived(msg: WireFrameMessage) {
        receivedMessages.increment()
        receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble())
    }

    override fun notifyMessageReceived(msg: VesMessage) {
        travelTimeToCollector.record(
                Duration.between(epochMicroToInstant(msg.header.lastEpochMicrosec), msg.wtpFrame.receivedAt)
        )
    }

    override fun notifyMessageSent(msg: RoutedMessage) {
        val now = Instant.now()
        sentMessages.increment()
        sentMessagesByTopic(msg.targetTopic).increment()

        processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
        totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
    }

    override fun notifyMessageDropped(cause: MessageDropCause) {
        droppedMessages.increment()
        messagesDroppedByCause(cause.tag).increment()
    }

    override fun notifyClientRejected(cause: ClientRejectionCause) {
        clientsRejected.increment()
        clientsRejectedByCause(cause.tag).increment()
    }

    override fun notifyClientConnected() {
        totalConnections.increment()
    }

    override fun notifyClientDisconnected() {
        disconnections.increment()
    }

    companion object {
        val INSTANCE by lazy { MicrometerMetrics() }
        internal const val PREFIX = "hvves"
        internal const val MESSAGES = "messages"
        internal const val RECEIVED = "received"
        internal const val DISCONNECTIONS = "disconnections"
        internal const val CONNECTIONS = "connections"
        internal const val ACTIVE = "active"
        internal const val BYTES = "bytes"
        internal const val DATA = "data"
        internal const val SENT = "sent"
        internal const val PROCESSING = "processing"
        internal const val CAUSE = "cause"
        internal const val CLIENTS = "clients"
        internal const val REJECTED = "rejected"
        internal const val TOPIC = "topic"
        internal const val DROPPED = "dropped"
        internal const val TIME = "time"
        internal const val LATENCY = "latency"
        internal const val PAYLOAD = "payload"
        internal const val WITHOUT = "without"
        internal const val ROUTING = "routing"
        internal const val TRAVEL = "travel"
        internal const val TO = "to"
        internal const val COLLECTOR = "collector"
        internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L)
        internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
    }
}