diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2021-02-15 18:31:24 -0500 |
---|---|---|
committer | KAPIL SINGAL <ks220y@att.com> | 2021-03-16 19:17:05 +0000 |
commit | ae588292c67e20c5cae8cb3c899957aac79a676d (patch) | |
tree | fe14757bc84ca95a38ed7d82a2cb9db6b5825c5f /ms/blueprintsprocessor/modules/commons/message-lib/src/main | |
parent | 7f858b082287017c2f28b7fac03476c5f761a517 (diff) |
Assign a unique worker ID for CDS Kafka worker
* Modified CDS Kafka workers to add the 5 lasts characters of the env var HOSTNAME to their worker ID.
* Small refactoring to move some utilitary functions to BlueprintMessageUtils
Issue-ID: CCSDK-3204
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Iaacd35e9cbe4705d17548518040c679185eaf30a
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main')
3 files changed, 47 insertions, 33 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt index 67dba1f19..3e7db9597 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.StreamsConfig +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils /** Common Properties **/ abstract class CommonProperties { @@ -51,7 +52,7 @@ abstract class MessageProducerProperties : CommonProperties() /** Basic Auth */ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - var clientId: String? = null + lateinit var clientId: String var acks: String = "all" // strongest producing guarantee var maxBlockMs: Int = 250 // max blocking time in ms to send a message var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) @@ -65,9 +66,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence - if (clientId != null) { - configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! - } + configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}" return configProps } } @@ -87,8 +86,8 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType @@ -132,7 +131,9 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() override fun getConfig(): HashMap<String, Any> { val configProperties = super.getConfig() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + // adjust the worker name with the hostname suffix because we'll have several workers, running in + // different pods, using the same worker name otherwise. + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}" configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee return configProperties @@ -154,8 +155,8 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType @@ -207,7 +208,9 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + // adjust the worker name with the hostname suffix because we'll have several workers, running in + // different pods, using the same worker name otherwise. + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}" /** To handle Back pressure, Get only configured record for processing */ if (pollRecords > 0) { @@ -233,8 +236,8 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 21fd84d11..d40067f4e 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants @@ -89,12 +88,13 @@ class KafkaMessageProducerService( BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER, BlueprintMessageUtils.kafkaMetricTag(topic) ).increment() - log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) + log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception) } else { - val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + - "partition(${metadata.partition()}) " + - "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." - log.info(message) + log.info( + "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + + "partition(${metadata.partition()}) " + + "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}." + ) } } messageTemplate().send(record, callback) @@ -104,7 +104,6 @@ class KafkaMessageProducerService( fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = messageProducerProperties.getConfig() - /** Add additional Properties */ if (additionalConfig != null) configProps.putAll(additionalConfig) @@ -147,18 +146,4 @@ class KafkaMessageProducerService( stepData = executionServiceOutput.stepData } } - - private fun getMessageLogData(message: Any): String { - return when (message) { - is ExecutionServiceInput -> { - val actionIdentifiers = message.actionIdentifiers - "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" - } - is ExecutionServiceOutput -> { - val actionIdentifiers = message.actionIdentifiers - "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" - } - else -> "message($message)" - } - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt index 7431998d9..b4817cd8a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -17,7 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.utils import io.micrometer.core.instrument.Tag +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import kotlin.math.max class BlueprintMessageUtils { companion object { @@ -25,5 +29,27 @@ class BlueprintMessageUtils { mutableListOf( Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) ) + + /** + * get OS hostname's last 5 characters + * Used to generate unique client ID. + */ + fun getHostnameSuffix(): String = + System.getenv("HOSTNAME").defaultToUUID().let { + it.substring(max(0, it.length - 5)) + } + + fun getMessageLogData(message: Any): String = + when (message) { + is ExecutionServiceInput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + is ExecutionServiceOutput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + else -> "message($message)" + } } } |