summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2021-02-05 14:18:06 -0500
committerJulien Fontaine <julien.fontaine@bell.ca>2021-02-11 16:14:30 -0500
commitde13d783a07dbe7f64d377256be465486c65309b (patch)
tree7dd100839e4c06696c6ce8f6799956964d434939 /ms/blueprintsprocessor/modules
parent136e3e19a1821a694d799e7c8d670407720c9e66 (diff)
Added Kafka metrics for CDS workers
Added counters to gather metrics on CDS Kafka workers. This will enable us to get metrics on how many messages we consumer and produce to/from kafka. For consumers we count how many messages we consume and how many failed ie. consumed but not able to be processed (parsing error). For producers we count how many messages we produce and how many failed ie. failed to be pushed to the cluster (unavailable brokers, network error, ...). Relocated metrics tag constants to BlueprintConstants so that they can be use by any CDS module. If they make sense for other metrics then they should be shared. Issue-ID: CCSDK-3155 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: Iad6aba588766f655f3a74cd626e0f74e29188f96
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt14
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt18
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt29
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt35
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt16
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt23
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt)20
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt)36
15 files changed, 231 insertions, 44 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
index fdd1d5d83..cfe436023 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
@@ -248,4 +248,12 @@ object BlueprintConstants {
const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution"
const val NODE_TEMPLATE_TYPE_DG = "dg-generic"
const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates"
+
+ // TAGS
+ const val METRIC_TAG_BP_NAME = "blueprint_name"
+ const val METRIC_TAG_BP_VERSION = "blueprint_version"
+ const val METRIC_TAG_BP_ACTION = "blueprint_action"
+ const val METRIC_TAG_BP_STATUS = "status"
+ const val METRIC_TAG_BP_OUTCOME = "outcome"
+ const val METRIC_TAG_TOPIC = "topic"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
new file mode 100644
index 000000000..be84f479f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+object BlueprintMessageMetricConstants {
+
+ private const val METRIC_PREFIX = "cds.kafka"
+
+ private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages"
+ private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages"
+
+ // COUNTERS
+ const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total"
+ const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error"
+
+ const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total"
+ const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error"
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
index 9e0c537cf..cb0bc3225 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
+import io.micrometer.core.instrument.MeterRegistry
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
@@ -36,17 +37,20 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
-open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) {
+open class BlueprintMessageLibPropertyService(
+ private var bluePrintPropertiesService: BlueprintPropertiesService,
+ private val meterRegistry: MeterRegistry
+) {
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
- return KafkaMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties, meterRegistry)
}
fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
val messageClientProperties = messageProducerProperties(prefix)
- return KafkaMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties, meterRegistry)
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
@@ -184,17 +188,20 @@ open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesSer
/** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties,
+ meterRegistry
)
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaSslAuthMessageConsumerProperties,
+ meterRegistry
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties,
+ meterRegistry
)
}
/** Stream Consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index af689a1f2..004b476b8 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.time.Duration
import kotlin.concurrent.thread
open class KafkaMessageConsumerService(
- private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
+ private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
+ private val meterRegistry: MeterRegistry
) :
BlueprintMessageConsumerService {
@@ -78,6 +82,10 @@ open class KafkaMessageConsumerService(
runBlocking {
consumerRecords?.forEach { consumerRecord ->
launch {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+ ).increment()
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
@@ -89,6 +97,10 @@ open class KafkaMessageConsumerService(
"key(${consumerRecord.key()})"
)
} else {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+ ).increment()
log.error("Channel is closed to receive message")
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 88b0dfaae..21fd84d11 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -18,6 +18,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.node.ObjectNode
+import io.micrometer.core.instrument.MeterRegistry
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
@@ -26,14 +27,17 @@ import org.apache.kafka.common.header.internals.RecordHeader
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
class KafkaMessageProducerService(
- private val messageProducerProperties: MessageProducerProperties
+ private val messageProducerProperties: MessageProducerProperties,
+ private val meterRegistry: MeterRegistry
) :
BlueprintMessageProducerService {
@@ -76,9 +80,17 @@ class KafkaMessageProducerService(
headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
- if (exception != null)
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(topic)
+ ).increment()
+ if (exception != null) {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(topic)
+ ).increment()
log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
- else {
+ } else {
val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
"partition(${metadata.partition()}) " +
"offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
new file mode 100644
index 000000000..7431998d9
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+
+class BlueprintMessageUtils {
+ companion object {
+ fun kafkaMetricTag(topic: String): MutableList<Tag> =
+ mutableListOf(
+ Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index f240f76c0..fb53ff45b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import io.mockk.every
import io.mockk.spyk
import kotlinx.coroutines.channels.consumeEach
@@ -47,6 +48,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -100,6 +102,9 @@ open class BlueprintMessageConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testKafkaBasicAuthConsumerService() {
runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 2293ceec3..1490a3311 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
@@ -37,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguratio
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@ open class BlueprintMessageProducerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testKafkaScramSslAuthProducerService() {
runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 70968ef0c..b93cd42d4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@@ -32,6 +34,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfigu
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@ class KafkaStreamsConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testProperties() {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
new file mode 100644
index 000000000..849a411a6
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import kotlin.test.assertEquals
+
+class BlueprintMessageUtilsTest {
+
+ @Test
+ fun testKafkaMetricTag() {
+ val expected = mutableListOf<Tag>(
+ Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic")
+ )
+ val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic")
+
+ assertEquals(expected, tags)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 0ab38c6bd..661e76b2b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -17,13 +17,16 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -43,8 +46,9 @@ import javax.annotation.PreDestroy
)
@Service
open class BlueprintProcessingKafkaConsumer(
- private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService,
- private val executionServiceHandler: ExecutionServiceHandler
+ private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler,
+ private val meterRegistry: MeterRegistry
) {
val log = logger(BlueprintProcessingKafkaConsumer::class)
@@ -69,7 +73,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Consumer Service **/
blueprintMessageConsumerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageConsumerService(CONSUMER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka consumer message service."
@@ -83,7 +87,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Producer Service **/
val blueprintMessageProducerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageProducerService(PRODUCER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka producer message service."
@@ -117,6 +121,10 @@ open class BlueprintProcessingKafkaConsumer(
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(message.topic())
+ ).increment()
log.error("failed in processing the consumed message : $message", e)
} finally {
ph.arriveAndDeregister()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
index 97c73243d..7c8f4ed5a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
object SelfServiceMetricConstants {
@@ -6,13 +22,6 @@ object SelfServiceMetricConstants {
private const val PROCESS_PREFIX = "$METRICS_PREFIX.process"
- // TAGS
- const val TAG_BP_NAME = "blueprint_name"
- const val TAG_BP_VERSION = "blueprint_version"
- const val TAG_BP_ACTION = "blueprint_action"
- const val TAG_BP_STATUS = "status"
- const val TAG_BP_OUTCOME = "outcome"
-
// COUNTERS
const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter"
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
index aa2938379..c04410a2f 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2019 Bell Canada.
+ * Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
import org.springframework.http.HttpStatus
import org.springframework.http.codec.multipart.FilePart
@@ -68,19 +68,19 @@ fun determineHttpStatusCode(statusCode: Int): HttpStatus {
fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> =
executionServiceInput.actionIdentifiers.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName)
)
}
fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> =
executionServiceOutput.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message)
)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
index 3a5cebc61..56cc691a4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,7 +66,8 @@ class BlueprintProcessingKafkaConsumerTest {
val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer(
bluePrintMessageLibPropertyService,
- executionServiceHandle
+ executionServiceHandle,
+ meterRegistry
)
launch {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
index 223896885..10db349e6 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
@@ -8,12 +24,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.springframework.http.HttpStatus
import org.springframework.test.context.junit4.SpringRunner
@RunWith(SpringRunner::class)
-class UtilsTest {
+class BlueprintProcessingUtilsTest {
@Test
fun `valid Http status codes should be produced for valid parameters`() {
@@ -41,9 +57,9 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
)
val metricTag = cbaMetricTags(executionServiceInput)
@@ -66,11 +82,11 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message)
)
val metricTag = cbaMetricTags(executionServiceOutput)