diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2021-02-05 14:18:06 -0500 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2021-02-11 16:14:30 -0500 |
commit | de13d783a07dbe7f64d377256be465486c65309b (patch) | |
tree | 7dd100839e4c06696c6ce8f6799956964d434939 /ms/blueprintsprocessor/modules/inbounds | |
parent | 136e3e19a1821a694d799e7c8d670407720c9e66 (diff) |
Added Kafka metrics for CDS workers
Added counters to gather metrics on CDS Kafka workers.
This will enable us to get metrics on how many messages we consumer and produce to/from kafka.
For consumers we count how many messages we consume and how many failed ie. consumed but not able to be processed (parsing error).
For producers we count how many messages we produce and how many failed ie. failed to be pushed to the cluster (unavailable brokers, network error, ...).
Relocated metrics tag constants to BlueprintConstants so that they can be use by any CDS module.
If they make sense for other metrics then they should be shared.
Issue-ID: CCSDK-3155
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Iad6aba588766f655f3a74cd626e0f74e29188f96
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
5 files changed, 67 insertions, 32 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt index 0ab38c6bd..661e76b2b 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt @@ -17,13 +17,16 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -43,8 +46,9 @@ import javax.annotation.PreDestroy ) @Service open class BlueprintProcessingKafkaConsumer( - private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService, - private val executionServiceHandler: ExecutionServiceHandler + private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler, + private val meterRegistry: MeterRegistry ) { val log = logger(BlueprintProcessingKafkaConsumer::class) @@ -69,7 +73,7 @@ open class BlueprintProcessingKafkaConsumer( /** Get the Message Consumer Service **/ blueprintMessageConsumerService = try { - bluePrintMessageLibPropertyService + blueprintMessageLibPropertyService .blueprintMessageConsumerService(CONSUMER_SELECTOR) } catch (e: BlueprintProcessorException) { val errorMsg = "Failed creating Kafka consumer message service." @@ -83,7 +87,7 @@ open class BlueprintProcessingKafkaConsumer( /** Get the Message Producer Service **/ val blueprintMessageProducerService = try { - bluePrintMessageLibPropertyService + blueprintMessageLibPropertyService .blueprintMessageProducerService(PRODUCER_SELECTOR) } catch (e: BlueprintProcessorException) { val errorMsg = "Failed creating Kafka producer message service." @@ -117,6 +121,10 @@ open class BlueprintProcessingKafkaConsumer( val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(message.topic()) + ).increment() log.error("failed in processing the consumed message : $message", e) } finally { ph.arriveAndDeregister() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt index 97c73243d..7c8f4ed5a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt @@ -1,3 +1,19 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api object SelfServiceMetricConstants { @@ -6,13 +22,6 @@ object SelfServiceMetricConstants { private const val PROCESS_PREFIX = "$METRICS_PREFIX.process" - // TAGS - const val TAG_BP_NAME = "blueprint_name" - const val TAG_BP_VERSION = "blueprint_version" - const val TAG_BP_ACTION = "blueprint_action" - const val TAG_BP_STATUS = "status" - const val TAG_BP_OUTCOME = "outcome" - // COUNTERS const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter" diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt index aa2938379..c04410a2f 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 Bell Canada. + * Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils import io.micrometer.core.instrument.Tag import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException import org.springframework.http.HttpStatus import org.springframework.http.codec.multipart.FilePart @@ -68,19 +68,19 @@ fun determineHttpStatusCode(statusCode: Int): HttpStatus { fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> = executionServiceInput.actionIdentifiers.let { mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName) ) } fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> = executionServiceOutput.let { mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName), - Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()), - Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()), + Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message) ) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt index 3a5cebc61..56cc691a4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +66,8 @@ class BlueprintProcessingKafkaConsumerTest { val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer( bluePrintMessageLibPropertyService, - executionServiceHandle + executionServiceHandle, + meterRegistry ) launch { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt index 223896885..10db349e6 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt @@ -1,3 +1,19 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils import io.micrometer.core.instrument.Tag @@ -8,12 +24,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants import org.springframework.http.HttpStatus import org.springframework.test.context.junit4.SpringRunner @RunWith(SpringRunner::class) -class UtilsTest { +class BlueprintProcessingUtilsTest { @Test fun `valid Http status codes should be produced for valid parameters`() { @@ -41,9 +57,9 @@ class UtilsTest { } val expectedTags = mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName) ) val metricTag = cbaMetricTags(executionServiceInput) @@ -66,11 +82,11 @@ class UtilsTest { } val expectedTags = mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName), - Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()), - Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()), + Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message) ) val metricTag = cbaMetricTags(executionServiceOutput) |