diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules')
15 files changed, 231 insertions, 44 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt index fdd1d5d83..cfe436023 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt @@ -248,4 +248,12 @@ object BlueprintConstants { const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution" const val NODE_TEMPLATE_TYPE_DG = "dg-generic" const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates" + + // TAGS + const val METRIC_TAG_BP_NAME = "blueprint_name" + const val METRIC_TAG_BP_VERSION = "blueprint_version" + const val METRIC_TAG_BP_ACTION = "blueprint_action" + const val METRIC_TAG_BP_STATUS = "status" + const val METRIC_TAG_BP_OUTCOME = "outcome" + const val METRIC_TAG_TOPIC = "topic" } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt new file mode 100644 index 000000000..be84f479f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +object BlueprintMessageMetricConstants { + + private const val METRIC_PREFIX = "cds.kafka" + + private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages" + private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages" + + // COUNTERS + const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total" + const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error" + + const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total" + const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error" +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt index 9e0c537cf..cb0bc3225 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode +import io.micrometer.core.instrument.MeterRegistry import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties @@ -36,17 +37,20 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) -open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) { +open class BlueprintMessageLibPropertyService( + private var bluePrintPropertiesService: BlueprintPropertiesService, + private val meterRegistry: MeterRegistry +) { fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return KafkaMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties, meterRegistry) } fun messageProducerProperties(prefix: String): MessageProducerProperties { @@ -184,17 +188,20 @@ open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesSer /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties, + meterRegistry ) } MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { return KafkaMessageConsumerService( - messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties, + meterRegistry ) } /** Stream Consumer */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index af689a1f2..004b476b8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.time.Duration import kotlin.concurrent.thread open class KafkaMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageConsumerService { @@ -78,6 +82,10 @@ open class KafkaMessageConsumerService( runBlocking { consumerRecords?.forEach { consumerRecord -> launch { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() /** execute the command block */ if (!channel.isClosedForSend) { channel.send(consumerRecord) @@ -89,6 +97,10 @@ open class KafkaMessageConsumerService( "key(${consumerRecord.key()})" ) } else { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic()) + ).increment() log.error("Channel is closed to receive message") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 88b0dfaae..21fd84d11 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.node.ObjectNode +import io.micrometer.core.instrument.MeterRegistry import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer @@ -26,14 +27,17 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.slf4j.LoggerFactory import java.nio.charset.Charset class KafkaMessageProducerService( - private val messageProducerProperties: MessageProducerProperties + private val messageProducerProperties: MessageProducerProperties, + private val meterRegistry: MeterRegistry ) : BlueprintMessageProducerService { @@ -76,9 +80,17 @@ class KafkaMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - if (exception != null) + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() + if (exception != null) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(topic) + ).increment() log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) - else { + } else { val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + "partition(${metadata.partition()}) " + "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt new file mode 100644 index 000000000..7431998d9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -0,0 +1,29 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants + +class BlueprintMessageUtils { + companion object { + fun kafkaMetricTag(topic: String): MutableList<Tag> = + mutableListOf( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) + ) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f240f76c0..fb53ff45b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.spyk import kotlinx.coroutines.channels.consumeEach @@ -47,6 +48,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -100,6 +102,9 @@ open class BlueprintMessageConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaBasicAuthConsumerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 2293ceec3..1490a3311 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import io.mockk.every import io.mockk.mockk import io.mockk.spyk @@ -37,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguratio import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ open class BlueprintMessageProducerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testKafkaScramSslAuthProducerService() { runBlocking { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index 70968ef0c..b93cd42d4 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -32,6 +34,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfigu import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -74,6 +77,9 @@ class KafkaStreamsConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService + @MockBean + lateinit var meterRegistry: MeterRegistry + @Test fun testProperties() { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt new file mode 100644 index 000000000..849a411a6 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.utils + +import io.micrometer.core.instrument.Tag +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import kotlin.test.assertEquals + +class BlueprintMessageUtilsTest { + + @Test + fun testKafkaMetricTag() { + val expected = mutableListOf<Tag>( + Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic") + ) + val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic") + + assertEquals(expected, tags) + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt index 0ab38c6bd..661e76b2b 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt @@ -17,13 +17,16 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import io.micrometer.core.instrument.MeterRegistry import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -43,8 +46,9 @@ import javax.annotation.PreDestroy ) @Service open class BlueprintProcessingKafkaConsumer( - private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService, - private val executionServiceHandler: ExecutionServiceHandler + private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler, + private val meterRegistry: MeterRegistry ) { val log = logger(BlueprintProcessingKafkaConsumer::class) @@ -69,7 +73,7 @@ open class BlueprintProcessingKafkaConsumer( /** Get the Message Consumer Service **/ blueprintMessageConsumerService = try { - bluePrintMessageLibPropertyService + blueprintMessageLibPropertyService .blueprintMessageConsumerService(CONSUMER_SELECTOR) } catch (e: BlueprintProcessorException) { val errorMsg = "Failed creating Kafka consumer message service." @@ -83,7 +87,7 @@ open class BlueprintProcessingKafkaConsumer( /** Get the Message Producer Service **/ val blueprintMessageProducerService = try { - bluePrintMessageLibPropertyService + blueprintMessageLibPropertyService .blueprintMessageProducerService(PRODUCER_SELECTOR) } catch (e: BlueprintProcessorException) { val errorMsg = "Failed creating Kafka producer message service." @@ -117,6 +121,10 @@ open class BlueprintProcessingKafkaConsumer( val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { + meterRegistry.counter( + BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, + BlueprintMessageUtils.kafkaMetricTag(message.topic()) + ).increment() log.error("failed in processing the consumed message : $message", e) } finally { ph.arriveAndDeregister() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt index 97c73243d..7c8f4ed5a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt @@ -1,3 +1,19 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api object SelfServiceMetricConstants { @@ -6,13 +22,6 @@ object SelfServiceMetricConstants { private const val PROCESS_PREFIX = "$METRICS_PREFIX.process" - // TAGS - const val TAG_BP_NAME = "blueprint_name" - const val TAG_BP_VERSION = "blueprint_version" - const val TAG_BP_ACTION = "blueprint_action" - const val TAG_BP_STATUS = "status" - const val TAG_BP_OUTCOME = "outcome" - // COUNTERS const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter" diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt index aa2938379..c04410a2f 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 Bell Canada. + * Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils import io.micrometer.core.instrument.Tag import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException import org.springframework.http.HttpStatus import org.springframework.http.codec.multipart.FilePart @@ -68,19 +68,19 @@ fun determineHttpStatusCode(statusCode: Int): HttpStatus { fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> = executionServiceInput.actionIdentifiers.let { mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName) ) } fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> = executionServiceOutput.let { mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName), - Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()), - Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()), + Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message) ) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt index 3a5cebc61..56cc691a4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +66,8 @@ class BlueprintProcessingKafkaConsumerTest { val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer( bluePrintMessageLibPropertyService, - executionServiceHandle + executionServiceHandle, + meterRegistry ) launch { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt index 223896885..10db349e6 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt @@ -1,3 +1,19 @@ +/* + * Copyright © 2021 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils import io.micrometer.core.instrument.Tag @@ -8,12 +24,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants import org.springframework.http.HttpStatus import org.springframework.test.context.junit4.SpringRunner @RunWith(SpringRunner::class) -class UtilsTest { +class BlueprintProcessingUtilsTest { @Test fun `valid Http status codes should be produced for valid parameters`() { @@ -41,9 +57,9 @@ class UtilsTest { } val expectedTags = mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName) ) val metricTag = cbaMetricTags(executionServiceInput) @@ -66,11 +82,11 @@ class UtilsTest { } val expectedTags = mutableListOf( - Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName), - Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion), - Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName), - Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()), - Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message) + Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion), + Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName), + Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()), + Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message) ) val metricTag = cbaMetricTags(executionServiceOutput) |