diff options
Diffstat (limited to 'sources')
8 files changed, 177 insertions, 77 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 61d28c2b..ac55e55f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -36,6 +36,8 @@ interface Metrics { fun notifyMessageReceived(msg: WireFrameMessage) fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) + fun notifyClientDisconnected() + fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index dce933ab..2e6bb4dc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -30,6 +31,8 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server = - NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider) + fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + collectorProvider: CollectorProvider, + metrics: Metrics): Server = + NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider, metrics) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index d8d786be..725622f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -22,31 +22,30 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.None import arrow.core.Option import arrow.core.getOrElse -import arrow.core.toOption import arrow.effects.IO import arrow.syntax.collections.firstOption import io.netty.handler.ssl.SslHandler import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Mono import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer -import java.time.Duration -import java.lang.Exception import java.security.cert.X509Certificate +import java.time.Duration import javax.net.ssl.SSLSession @@ -56,15 +55,15 @@ import javax.net.ssl.SSLSession */ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val sslContextFactory: ServerSslContextFactory, - private val collectorProvider: CollectorProvider) : Server { + private val collectorProvider: CollectorProvider, + private val metrics: Metrics) : Server { override fun start(): IO<ServerHandle> = IO { - val tcpServer = TcpServer.create() + TcpServer.create() .addressSupplier { serverConfig.serverListenAddress } .configureSsl() .handle(this::handleConnection) - - NettyServerHandle(tcpServer.bindNow()) + .let { NettyServerHandle(it.bindNow()) } } private fun TcpServer.configureSsl() = @@ -79,13 +78,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { + metrics.notifyClientConnected() val clientContext = ClientContext(nettyOutbound.alloc()) nettyInbound.withConnection { populateClientContext(clientContext, it) it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } } - } logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } @@ -97,7 +96,8 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, { logger.info(clientContext::fullMdc) { "Handling new connection" } nettyInbound.withConnection { conn -> - conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) + conn + .configureIdleTimeout(clientContext, serverConfig.idleTimeout) .logConnectionClosed(clientContext) } it.handleConnection(createDataStream(nettyInbound)) @@ -132,15 +132,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .receive() .retain() - private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection { - onReadIdle(timeout.toMillis()) { - logger.info(ctx) { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = + onReadIdle(timeout.toMillis()) { + logger.info(ctx) { + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." + } + disconnectClient(ctx) } - disconnectClient(ctx) - } - return this - } + private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { @@ -152,13 +151,11 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } } - private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { - onTerminate().subscribe { - // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) - logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } - } - return this - } + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = + onDispose { + metrics.notifyClientDisconnected() + logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } + } companion object { private val logger = Logger(NettyTcpServer::class) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 572cc796..f457aeaf 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -175,7 +175,6 @@ object MetricsSpecification : Spek({ } describe("clients rejected metrics") { - given("rejection causes") { mapOf( ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to @@ -192,7 +191,7 @@ object MetricsSpecification : Spek({ assertThat(metrics.clientRejectionCause.size) .describedAs("metrics were notified with only one rejection cause") .isOne() - assertThat(metrics.clientRejectionCause.get(cause)) + assertThat(metrics.clientRejectionCause[cause]) .describedAs("metrics were notified only once with correct client rejection cause") .isOne() } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index a27d167a..8573d86f 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -21,12 +21,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import java.time.Duration import java.time.Instant -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause -import java.util.concurrent.ConcurrentHashMap import kotlin.test.fail /** @@ -35,14 +34,15 @@ import kotlin.test.fail */ class FakeMetrics : Metrics { - var bytesReceived: Int = 0 ; private set - var messageBytesReceived: Int = 0 ; private set - var messagesDroppedCount: Int = 0 ; private set - var lastProcessingTimeMicros: Double = -1.0 ; private set - private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap() - var messagesSentCount: Int = 0 ; private set - val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap() - var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>() ; private set + var bytesReceived: Int = 0; private set + var messageBytesReceived: Int = 0; private set + var messagesDroppedCount: Int = 0; private set + var lastProcessingTimeMicros: Double = -1.0; private set + var messagesSentCount: Int = 0; private set + var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set + + private val messagesSentToTopic = mutableMapOf<String, Int>() + private val messagesDroppedCause = mutableMapOf<MessageDropCause, Int>() override fun notifyBytesReceived(size: Int) { bytesReceived += size @@ -69,6 +69,12 @@ class FakeMetrics : Metrics { clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 } } + override fun notifyClientDisconnected() { + } + + override fun notifyClientConnected() { + } + fun messagesOnTopic(topic: String) = messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic") diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index d35e17d6..288145aa 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -50,10 +50,13 @@ class MicrometerMetrics internal constructor( private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) + private val connectionsTotalCount = registry.counter(name(CONNECTIONS, TOTAL, COUNT)) + private val disconnectionsCount = registry.counter(name(DISCONNECTIONS, COUNT)) + private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME)) private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME)) - private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + private val sentCount = registry.counter(name(MESSAGES, SENT, COUNT)) private val sentToTopicCount = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic) }.memoize<String, Counter>() @@ -70,8 +73,13 @@ class MicrometerMetrics internal constructor( init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { - (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0) + } + + registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) { + (connectionsTotalCount.count() - disconnectionsCount.count()).coerceAtLeast(0.0) } + ClassLoaderMetrics().bindTo(registry) JvmMemoryMetrics().bindTo(registry) JvmGcMetrics().bindTo(registry) @@ -79,7 +87,6 @@ class MicrometerMetrics internal constructor( JvmThreadMetrics().bindTo(registry) } - val metricsProvider = MicrometerPrometheusMetricsProvider(registry) override fun notifyBytesReceived(size: Int) { @@ -93,7 +100,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() - sentCountTotal.increment() + sentCount.increment() sentToTopicCount(msg.topic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) @@ -110,11 +117,22 @@ class MicrometerMetrics internal constructor( clientsRejectedCauseCount(cause.tag).increment() } + override fun notifyClientConnected() { + connectionsTotalCount.increment() + } + + override fun notifyClientDisconnected() { + disconnectionsCount.increment() + } + companion object { val INSTANCE = MicrometerMetrics() internal const val PREFIX = "hvves" internal const val MESSAGES = "messages" internal const val RECEIVED = "received" + internal const val DISCONNECTIONS = "disconnections" + internal const val CONNECTIONS = "connections" + internal const val ACTIVE = "active" internal const val BYTES = "bytes" internal const val COUNT = "count" internal const val DATA = "data" @@ -125,6 +143,7 @@ class MicrometerMetrics internal constructor( internal const val REJECTED = "rejected" internal const val TOPIC = "topic" internal const val DROPPED = "dropped" + internal const val TOTAL = "total" internal const val TIME = "time" internal const val LATENCY = "latency" internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index b35dc53d..f9be546a 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -44,7 +44,7 @@ object VesServer : ServerStarter() { config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() - return ServerFactory.createNettyTcpServer(config, collectorProvider) + return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE) } override fun serverStartedMessage(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 71fc8f7f..24355d5d 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge -import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig @@ -35,10 +34,10 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame @@ -46,6 +45,8 @@ import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize import java.time.Instant +import io.micrometer.core.instrument.Meter + import java.time.temporal.Temporal import java.util.concurrent.TimeUnit import kotlin.reflect.KClass @@ -56,7 +57,6 @@ import kotlin.reflect.KClass */ object MicrometerMetricsTest : Spek({ val doublePrecision = Percentage.withPercentage(0.5) - val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time") lateinit var registry: PrometheusMeterRegistry lateinit var cut: MicrometerMetrics @@ -87,6 +87,7 @@ object MicrometerMetricsTest : Spek({ fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = verifyCounter(registrySearch(name), verifier) + fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { fun <T : Meter> verifyAllMetersAreUnchangedBut( clazz: KClass<T>, @@ -98,7 +99,9 @@ object MicrometerMetricsTest : Spek({ .map { it as T } .filterNot { it.id.name in changedCounters } .forEach { - assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) + assertThat(valueOf(it)) + .describedAs(it.id.toString()) + .isCloseTo(0.0, doublePrecision) } } @@ -108,8 +111,8 @@ object MicrometerMetricsTest : Spek({ } } - describe("notifyBytesReceived") { + describe("notifyBytesReceived") { on("$PREFIX.data.received.bytes counter") { val counterName = "$PREFIX.data.received.bytes" @@ -187,6 +190,7 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.sent.topic.count counter") { val counterName = "$PREFIX.messages.sent.topic.count" + it("should handle counters for different topics") { cut.notifyMessageSent(routedMessage(topicName1)) cut.notifyMessageSent(routedMessage(topicName2)) @@ -242,13 +246,12 @@ object MicrometerMetricsTest : Spek({ "$PREFIX.messages.processing.time") } } - } describe("notifyMessageDropped") { - on("$PREFIX.messages.dropped.count counter") { val counterName = "$PREFIX.messages.dropped.count" + it("should increment counter") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -262,6 +265,7 @@ object MicrometerMetricsTest : Spek({ on("$PREFIX.messages.dropped.cause.count counter") { val counterName = "$PREFIX.messages.dropped.cause.count" + it("should handle counters for different drop reasons") { cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) @@ -278,36 +282,38 @@ object MicrometerMetricsTest : Spek({ } } - describe("processing gauge") { - it("should show difference between sent and received messages") { + describe("notifyClientConnected") { + on("$PREFIX.connections.total.count counter") { + val counterName = "$PREFIX.connections.total.count" - on("positive difference") { - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(2.0, doublePrecision) - } - } + it("should increment counter") { + cut.notifyClientConnected() + cut.notifyClientConnected() - on("zero difference") { - cut.notifyMessageReceived(emptyWireProtocolFrame()) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(0.0, doublePrecision) + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) } + verifyCountersAndTimersAreUnchangedBut(counterName) } + } - on("negative difference") { - cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) - cut.notifyMessageSent(routedMessage("fault")) - cut.notifyMessageSent(routedMessage("perf3gpp")) - verifyGauge("messages.processing.count") { - assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + + describe("notifyClientDisconnected") { + on("$PREFIX.disconnections.count counter") { + val counterName = "$PREFIX.disconnections.count" + + it("should increment counter") { + cut.notifyClientDisconnected() + cut.notifyClientDisconnected() + + verifyCounter(counterName) { + assertThat(it.count()).isCloseTo(2.0, doublePrecision) } + verifyCountersAndTimersAreUnchangedBut(counterName) } } + } describe("notifyClientRejected") { @@ -342,6 +348,74 @@ object MicrometerMetricsTest : Spek({ } } } + + describe("$PREFIX.messages.processing.count gauge") { + val gaugeName = "$PREFIX.messages.processing.count" + + on("message traffic") { + it("should calculate positive difference between sent and received messages") { + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256)) + cut.notifyMessageSent(routedMessage("perf3gpp")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(2.0, doublePrecision) + } + } + + it("should calculate no difference between sent and received messages") { + cut.notifyMessageSent(routedMessage("perf3gpp")) + cut.notifyMessageSent(routedMessage("fault")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + + it("should calculate negative difference between sent and received messages") { + cut.notifyMessageSent(routedMessage("fault")) + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } + + describe("$PREFIX.connections.active.count gauge") { + val gaugeName = "$PREFIX.connections.active.count" + + on("connection traffic") { + it("should calculate positive difference between connected and disconnected clients") { + cut.notifyClientConnected() + cut.notifyClientConnected() + cut.notifyClientConnected() + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(2.0, doublePrecision) + } + } + + it("should calculate no difference between connected and disconnected clients") { + cut.notifyClientDisconnected() + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + + it("should calculate negative difference between connected and disconnected clients") { + cut.notifyClientDisconnected() + + verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } }) fun routedMessage(topic: String, partition: Int = 0) = @@ -364,4 +438,4 @@ fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = }.let { evt -> RoutedMessage(topic, partition, VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + }
\ No newline at end of file |