aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2021-02-15 18:31:24 -0500
committerKAPIL SINGAL <ks220y@att.com>2021-03-16 19:17:05 +0000
commitae588292c67e20c5cae8cb3c899957aac79a676d (patch)
treefe14757bc84ca95a38ed7d82a2cb9db6b5825c5f /ms/blueprintsprocessor/modules/commons/message-lib/src/main
parent7f858b082287017c2f28b7fac03476c5f761a517 (diff)
Assign a unique worker ID for CDS Kafka worker
* Modified CDS Kafka workers to add the 5 lasts characters of the env var HOSTNAME to their worker ID. * Small refactoring to move some utilitary functions to BlueprintMessageUtils Issue-ID: CCSDK-3204 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: Iaacd35e9cbe4705d17548518040c679185eaf30a
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt26
3 files changed, 47 insertions, 33 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
index 67dba1f19..3e7db9597 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.StreamsConfig
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
/** Common Properties **/
abstract class CommonProperties {
@@ -51,7 +52,7 @@ abstract class MessageProducerProperties : CommonProperties()
/** Basic Auth */
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
- var clientId: String? = null
+ lateinit var clientId: String
var acks: String = "all" // strongest producing guarantee
var maxBlockMs: Int = 250 // max blocking time in ms to send a message
var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
@@ -65,9 +66,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
- if (clientId != null) {
- configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
- }
+ configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
return configProps
}
}
@@ -87,8 +86,8 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -132,7 +131,9 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties()
override fun getConfig(): HashMap<String, Any> {
val configProperties = super.getConfig()
- configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+ // adjust the worker name with the hostname suffix because we'll have several workers, running in
+ // different pods, using the same worker name otherwise.
+ configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
return configProperties
@@ -154,8 +155,8 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -207,7 +208,9 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties()
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+ // adjust the worker name with the hostname suffix because we'll have several workers, running in
+ // different pods, using the same worker name otherwise.
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
/** To handle Back pressure, Get only configured record for processing */
if (pollRecords > 0) {
@@ -233,8 +236,8 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 21fd84d11..d40067f4e 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
@@ -89,12 +88,13 @@ class KafkaMessageProducerService(
BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
BlueprintMessageUtils.kafkaMetricTag(topic)
).increment()
- log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception)
} else {
- val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
- "partition(${metadata.partition()}) " +
- "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
- log.info(message)
+ log.info(
+ "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}."
+ )
}
}
messageTemplate().send(record, callback)
@@ -104,7 +104,6 @@ class KafkaMessageProducerService(
fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = messageProducerProperties.getConfig()
-
/** Add additional Properties */
if (additionalConfig != null)
configProps.putAll(additionalConfig)
@@ -147,18 +146,4 @@ class KafkaMessageProducerService(
stepData = executionServiceOutput.stepData
}
}
-
- private fun getMessageLogData(message: Any): String {
- return when (message) {
- is ExecutionServiceInput -> {
- val actionIdentifiers = message.actionIdentifiers
- "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
- }
- is ExecutionServiceOutput -> {
- val actionIdentifiers = message.actionIdentifiers
- "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
- }
- else -> "message($message)"
- }
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
index 7431998d9..b4817cd8a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
@@ -17,7 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import kotlin.math.max
class BlueprintMessageUtils {
companion object {
@@ -25,5 +29,27 @@ class BlueprintMessageUtils {
mutableListOf(
Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
)
+
+ /**
+ * get OS hostname's last 5 characters
+ * Used to generate unique client ID.
+ */
+ fun getHostnameSuffix(): String =
+ System.getenv("HOSTNAME").defaultToUUID().let {
+ it.substring(max(0, it.length - 5))
+ }
+
+ fun getMessageLogData(message: Any): String =
+ when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
}
}