diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2021-02-05 14:18:06 -0500 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2021-02-11 16:14:30 -0500 |
commit | de13d783a07dbe7f64d377256be465486c65309b (patch) | |
tree | 7dd100839e4c06696c6ce8f6799956964d434939 /ms/blueprintsprocessor/modules/commons/message-lib | |
parent | 136e3e19a1821a694d799e7c8d670407720c9e66 (diff) |
Added Kafka metrics for CDS workers
Added counters to gather metrics on CDS Kafka workers.
This will enable us to get metrics on how many messages we consumer and produce to/from kafka.
For consumers we count how many messages we consume and how many failed ie. consumed but not able to be processed (parsing error).
For producers we count how many messages we produce and how many failed ie. failed to be pushed to the cluster (unavailable brokers, network error, ...).
Relocated metrics tag constants to BlueprintConstants so that they can be use by any CDS module.
If they make sense for other metrics then they should be shared.
Issue-ID: CCSDK-3155
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Iad6aba588766f655f3a74cd626e0f74e29188f96
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
9 files changed, 156 insertions, 12 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt new file mode 100644 index 000000000..be84f479f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +object BlueprintMessageMetricConstants { + + private const val METRIC_PREFIX = "cds.kafka" + + private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages" + private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages" + + // COUNTERS + const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total" + const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error" + + const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total" + const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error" +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt index 9e0c537cf..cb0bc3225 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode +import io.micrometer.core.instrument.MeterRegistry import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties @@ -36,17 +37,20 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) -open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) { +open class BlueprintMessageLibPropertyService( + private var bluePrintPropertiesService: BlueprintPropertiesService, + private val meterRegistry: MeterRegistry +) { fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun messageProducerProperties(prefix: String): MessageProducerProperties { @@ -184,17 +188,20 @@ open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesSer /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties, + meterRegistry ) } /** Stream Consumer */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index af689a1f2..004b476b8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.time.Duration import kotlin.concurrent.thread open class KafkaMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageConsumerService { @@ -78,6 +82,10 @@ open class KafkaMessageConsumerService( runBlocking { consumerRecords?.forEach { consumerRecord -> launch { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() /** execute the command block */ if (!channel.isClosedForSend) { channel.send(consumerRecord) @@ -89,6 +97,10 @@ open class KafkaMessageConsumerService( "key(${consumerRecord.key()})" ) } else { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() log.error("Channel is closed to receive message") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 88b0dfaae..21fd84d11 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.node.ObjectNode +import io.micrometer.core.instrument.MeterRegistry import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer @@ -26,14 +27,17 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.slf4j.LoggerFactory import java.nio.charset.Charset class KafkaMessageProducerService( - private val messageProducerProperties: MessageProducerProperties + private val messageProducerProperties: MessageProducerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageProducerService { @@ -76,9 +80,17 @@ class KafkaMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - if (exception != null) + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() + if (exception != null) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) - else { + } else { val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + "partition(${metadata.partition()}) " + "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt new file mode 100644 index 000000000..7431998d9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -0,0 +1,29 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants + +class BlueprintMessageUtils { + companion object { + fun kafkaMetricTag(topic: String): MutableList<Tag> = + mutableListOf( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) + ) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f240f76c0..fb53ff45b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.spyk import kotlinx.coroutines.channels.consumeEach @@ -47,6 +48,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -100,6 +102,9 @@ open class BlueprintMessageConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaBasicAuthConsumerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 2293ceec3..1490a3311 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.mockk import io.mockk.spyk @@ -37,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguratio import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ open class BlueprintMessageProducerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaScramSslAuthProducerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index 70968ef0c..b93cd42d4 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -32,6 +34,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfigu import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ class KafkaStreamsConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testProperties() { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt new file mode 100644 index 000000000..849a411a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import kotlin.test.assertEquals + +class BlueprintMessageUtilsTest { + + @Test + fun testKafkaMetricTag() { + val expected = mutableListOf<Tag>( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic") + ) + val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic") + + assertEquals(expected, tags) + } +} |