aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt16
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt23
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt)20
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt)36
5 files changed, 67 insertions, 32 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 0ab38c6bd..661e76b2b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -17,13 +17,16 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -43,8 +46,9 @@ import javax.annotation.PreDestroy
)
@Service
open class BlueprintProcessingKafkaConsumer(
- private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService,
- private val executionServiceHandler: ExecutionServiceHandler
+ private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler,
+ private val meterRegistry: MeterRegistry
) {
val log = logger(BlueprintProcessingKafkaConsumer::class)
@@ -69,7 +73,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Consumer Service **/
blueprintMessageConsumerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageConsumerService(CONSUMER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka consumer message service."
@@ -83,7 +87,7 @@ open class BlueprintProcessingKafkaConsumer(
/** Get the Message Producer Service **/
val blueprintMessageProducerService = try {
- bluePrintMessageLibPropertyService
+ blueprintMessageLibPropertyService
.blueprintMessageProducerService(PRODUCER_SELECTOR)
} catch (e: BlueprintProcessorException) {
val errorMsg = "Failed creating Kafka producer message service."
@@ -117,6 +121,10 @@ open class BlueprintProcessingKafkaConsumer(
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
+ meterRegistry.counter(
+ BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+ BlueprintMessageUtils.kafkaMetricTag(message.topic())
+ ).increment()
log.error("failed in processing the consumed message : $message", e)
} finally {
ph.arriveAndDeregister()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
index 97c73243d..7c8f4ed5a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
object SelfServiceMetricConstants {
@@ -6,13 +22,6 @@ object SelfServiceMetricConstants {
private const val PROCESS_PREFIX = "$METRICS_PREFIX.process"
- // TAGS
- const val TAG_BP_NAME = "blueprint_name"
- const val TAG_BP_VERSION = "blueprint_version"
- const val TAG_BP_ACTION = "blueprint_action"
- const val TAG_BP_STATUS = "status"
- const val TAG_BP_OUTCOME = "outcome"
-
// COUNTERS
const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter"
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
index aa2938379..c04410a2f 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2019 Bell Canada.
+ * Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
import org.springframework.http.HttpStatus
import org.springframework.http.codec.multipart.FilePart
@@ -68,19 +68,19 @@ fun determineHttpStatusCode(statusCode: Int): HttpStatus {
fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> =
executionServiceInput.actionIdentifiers.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName)
)
}
fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> =
executionServiceOutput.let {
mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message)
)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
index 3a5cebc61..56cc691a4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,7 +66,8 @@ class BlueprintProcessingKafkaConsumerTest {
val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer(
bluePrintMessageLibPropertyService,
- executionServiceHandle
+ executionServiceHandle,
+ meterRegistry
)
launch {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
index 223896885..10db349e6 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
import io.micrometer.core.instrument.Tag
@@ -8,12 +24,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.springframework.http.HttpStatus
import org.springframework.test.context.junit4.SpringRunner
@RunWith(SpringRunner::class)
-class UtilsTest {
+class BlueprintProcessingUtilsTest {
@Test
fun `valid Http status codes should be produced for valid parameters`() {
@@ -41,9 +57,9 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
)
val metricTag = cbaMetricTags(executionServiceInput)
@@ -66,11 +82,11 @@ class UtilsTest {
}
val expectedTags = mutableListOf(
- Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
- Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
- Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
- Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message)
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
+ Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message)
)
val metricTag = cbaMetricTags(executionServiceOutput)