summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt14
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt18
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt29
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt35
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt16
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt23
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt)20
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt)36
15 files changed, 231 insertions, 44 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
index fdd1d5d83..cfe436023 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
@@ -248,4 +248,12 @@ object BlueprintConstants {
const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution"
const val NODE_TEMPLATE_TYPE_DG = "dg-generic"
const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates"
+
+ // TAGS
+ const val METRIC_TAG_BP_NAME = "blueprint_name"
+ const val METRIC_TAG_BP_VERSION = "blueprint_version"
+ const val METRIC_TAG_BP_ACTION = "blueprint_action"
+ const val METRIC_TAG_BP_STATUS = "status"
+ const val METRIC_TAG_BP_OUTCOME = "outcome"
+ const val METRIC_TAG_TOPIC = "topic"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
new file mode 100644
index 000000000..be84f479f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+object BlueprintMessageMetricConstants {
+
+ private const val METRIC_PREFIX = "cds.kafka"
+
+ private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages"
+ private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages"
+
+ // COUNTERS
+ const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total"
+ const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error"
+
+ const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total"
+ const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error"
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
index 9e0c537cf..cb0bc3225 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
+import io.micrometer.core.instrument.MeterRegistry
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
@@ -36,17 +37,20 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
-open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) {
+open class BlueprintMessageLibPropertyService(
+ private var bluePrintPropertiesService: BlueprintPropertiesService,
+ private val meterRegistry: MeterRegistry
+) {
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
- return KafkaMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties, meterRegistry)
}
fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
val messageClientProperties = messageProducerProperties(prefix)
- return KafkaMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties, meterRegistry)
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
@@ -184,17 +188,20 @@ open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesSer
/** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties,
+ meterRegistry
)
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaSslAuthMessageConsumerProperties,
+ meterRegistry
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
return KafkaMessageConsumerService(
- messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+ messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties,
+ meterRegistry
)
}
/** Stream Consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index af689a1f2..004b476b8 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.time.Duration
import kotlin.concurrent.thread
open class KafkaMessageConsumerService(
- private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
+ private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
+ private val meterRegistry: MeterRegistry
) :
BlueprintMessageConsumerService {
@@ -78,6 +82,10 @@ open class KafkaMessageConsumerService(
runBlocking {
consumerRecords?.forEach { consumerRecord ->
launch {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+ ).increment()
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
@@ -89,6 +97,10 @@ open class KafkaMessageConsumerService(
"key(${consumerRecord.key()})"
)
} else {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+ ).increment()
log.error("Channel is closed to receive message")
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 88b0dfaae..21fd84d11 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -18,6 +18,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.node.ObjectNode
+import io.micrometer.core.instrument.MeterRegistry
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
@@ -26,14 +27,17 @@ import org.apache.kafka.common.header.internals.RecordHeader
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
class KafkaMessageProducerService(
- private val messageProducerProperties: MessageProducerProperties
+ private val messageProducerProperties: MessageProducerProperties,
+ private val meterRegistry: MeterRegistry
) :
BlueprintMessageProducerService {
@@ -76,9 +80,17 @@ class KafkaMessageProducerService(
headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
- if (exception != null)
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(topic)
+ ).increment()
+ if (exception != null) {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(topic)
+ ).increment()
log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
- else {
+ } else {
val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
"partition(${metadata.partition()}) " +
"offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
new file mode 100644
index 000000000..7431998d9
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+
+class BlueprintMessageUtils {
+ companion object {
+ fun kafkaMetricTag(topic: String): MutableList<Tag> =
+ mutableListOf(
+ Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index f240f76c0..fb53ff45b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import io.mockk.every
import io.mockk.spyk
import kotlinx.coroutines.channels.consumeEach
@@ -47,6 +48,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -100,6 +102,9 @@ open class BlueprintMessageConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testKafkaBasicAuthConsumerService() {
runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 2293ceec3..1490a3311 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
@@ -37,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguratio
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@ open class BlueprintMessageProducerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testKafkaScramSslAuthProducerService() {
runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 70968ef0c..b93cd42d4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@@ -32,6 +34,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfigu
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@ class KafkaStreamsConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Test
fun testProperties() {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
new file mode 100644
index 000000000..849a411a6
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import kotlin.test.assertEquals
+
+class BlueprintMessageUtilsTest {
+
+ @Test
+ fun testKafkaMetricTag() {
+ val expected = mutableListOf<Tag>(
+ Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic")
+ )
+ val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic")
+
+ assertEquals(expected, tags)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 0ab38c6bd..661e76b2b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -17,13 +17,16 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -43,8 +46,9 @@ import javax.annotation.PreDestroy
)
@Service
open class BlueprintProcessingKafkaConsumer(
- private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService,
- private val executionServiceHandler: ExecutionServiceHandler
+ private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler,
+ private val meterRegistry: MeterRegistry
) {
val log = logger(BlueprintProcessingKafkaConsumer::class)
@@ -69,7 +73,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Consumer Service **/
blueprintMessageConsumerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageConsumerService(CONSUMER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka consumer message service."
@@ -83,7 +87,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Producer Service **/
val blueprintMessageProducerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageProducerService(PRODUCER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka producer message service."
@@ -117,6 +121,10 @@ open class BlueprintProcessingKafkaConsumer(
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(message.topic())
+ ).increment()
log.error("failed in processing the consumed message : $message", e)
} finally {
ph.arriveAndDeregister()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
index 97c73243d..7c8f4ed5a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
object SelfServiceMetricConstants {
@@ -6,13 +22,6 @@ object SelfServiceMetricConstants {
private const val PROCESS_PREFIX = "$METRICS_PREFIX.process"
- // TAGS
- const val TAG_BP_NAME = "blueprint_name"
- const val TAG_BP_VERSION = "blueprint_version"
- const val TAG_BP_ACTION = "blueprint_action"
- const val TAG_BP_STATUS = "status"
- const val TAG_BP_OUTCOME = "outcome"
-
// COUNTERS
const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter"
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
index aa2938379..c04410a2f 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2019 Bell Canada.
+ * Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
import org.springframework.http.HttpStatus
import org.springframework.http.codec.multipart.FilePart
@@ -68,19 +68,19 @@ fun determineHttpStatusCode(statusCode: Int): HttpStatus {
fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> =
executionServiceInput.actionIdentifiers.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName)
)
}
fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> =
executionServiceOutput.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message)
)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
index 3a5cebc61..56cc691a4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,7 +66,8 @@ class BlueprintProcessingKafkaConsumerTest {
val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer(
bluePrintMessageLibPropertyService,
- executionServiceHandle
+ executionServiceHandle,
+ meterRegistry
)
launch {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
index 223896885..10db349e6 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
@@ -8,12 +24,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.springframework.http.HttpStatus
import org.springframework.test.context.junit4.SpringRunner
@RunWith(SpringRunner::class)
-class UtilsTest {
+class BlueprintProcessingUtilsTest {
@Test
fun `valid Http status codes should be produced for valid parameters`() {
@@ -41,9 +57,9 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
)
val metricTag = cbaMetricTags(executionServiceInput)
@@ -66,11 +82,11 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message)
)
val metricTag = cbaMetricTags(executionServiceOutput)