diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
9 files changed, 156 insertions, 12 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt new file mode 100644 index 000000000..be84f479f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +object BlueprintMessageMetricConstants { + + private const val METRIC_PREFIX = "cds.kafka" + + private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages" + private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages" + + // COUNTERS + const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total" + const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error" + + const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total" + const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error" +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt index 9e0c537cf..cb0bc3225 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode +import io.micrometer.core.instrument.MeterRegistry import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties @@ -36,17 +37,20 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) -open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) { +open class BlueprintMessageLibPropertyService( + private var bluePrintPropertiesService: BlueprintPropertiesService, + private val meterRegistry: MeterRegistry +) { fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun messageProducerProperties(prefix: String): MessageProducerProperties { @@ -184,17 +188,20 @@ open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesSer /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties, + meterRegistry ) } /** Stream Consumer */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index af689a1f2..004b476b8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.time.Duration import kotlin.concurrent.thread open class KafkaMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageConsumerService { @@ -78,6 +82,10 @@ open class KafkaMessageConsumerService( runBlocking { consumerRecords?.forEach { consumerRecord -> launch { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() /** execute the command block */ if (!channel.isClosedForSend) { channel.send(consumerRecord) @@ -89,6 +97,10 @@ open class KafkaMessageConsumerService( "key(${consumerRecord.key()})" ) } else { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() log.error("Channel is closed to receive message") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 88b0dfaae..21fd84d11 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.node.ObjectNode +import io.micrometer.core.instrument.MeterRegistry import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer @@ -26,14 +27,17 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.slf4j.LoggerFactory import java.nio.charset.Charset class KafkaMessageProducerService( - private val messageProducerProperties: MessageProducerProperties + private val messageProducerProperties: MessageProducerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageProducerService { @@ -76,9 +80,17 @@ class KafkaMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - if (exception != null) + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() + if (exception != null) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) - else { + } else { val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + "partition(${metadata.partition()}) " + "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt new file mode 100644 index 000000000..7431998d9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -0,0 +1,29 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants + +class BlueprintMessageUtils { + companion object { + fun kafkaMetricTag(topic: String): MutableList<Tag> = + mutableListOf( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) + ) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f240f76c0..fb53ff45b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.spyk import kotlinx.coroutines.channels.consumeEach @@ -47,6 +48,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -100,6 +102,9 @@ open class BlueprintMessageConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaBasicAuthConsumerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 2293ceec3..1490a3311 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.mockk import io.mockk.spyk @@ -37,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguratio import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ open class BlueprintMessageProducerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaScramSslAuthProducerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index 70968ef0c..b93cd42d4 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -32,6 +34,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfigu import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ class KafkaStreamsConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testProperties() { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt new file mode 100644 index 000000000..849a411a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import kotlin.test.assertEquals + +class BlueprintMessageUtilsTest { + + @Test + fun testKafkaMetricTag() { + val expected = mutableListOf<Tag>( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic") + ) + val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic") + + assertEquals(expected, tags) + } +} |