aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2021-02-15 18:31:24 -0500
committerKAPIL SINGAL <ks220y@att.com>2021-03-16 19:17:05 +0000
commitae588292c67e20c5cae8cb3c899957aac79a676d (patch)
treefe14757bc84ca95a38ed7d82a2cb9db6b5825c5f /ms/blueprintsprocessor/modules/commons/message-lib
parent7f858b082287017c2f28b7fac03476c5f761a517 (diff)
Assign a unique worker ID for CDS Kafka worker
* Modified CDS Kafka workers to add the 5 lasts characters of the env var HOSTNAME to their worker ID. * Small refactoring to move some utilitary functions to BlueprintMessageUtils Issue-ID: CCSDK-3204 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: Iaacd35e9cbe4705d17548518040c679185eaf30a
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt26
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt10
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt47
6 files changed, 112 insertions, 36 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
index 67dba1f19..3e7db9597 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.StreamsConfig
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
/** Common Properties **/
abstract class CommonProperties {
@@ -51,7 +52,7 @@ abstract class MessageProducerProperties : CommonProperties()
/** Basic Auth */
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
- var clientId: String? = null
+ lateinit var clientId: String
var acks: String = "all" // strongest producing guarantee
var maxBlockMs: Int = 250 // max blocking time in ms to send a message
var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
@@ -65,9 +66,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
- if (clientId != null) {
- configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
- }
+ configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
return configProps
}
}
@@ -87,8 +86,8 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -132,7 +131,9 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties()
override fun getConfig(): HashMap<String, Any> {
val configProperties = super.getConfig()
- configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+ // adjust the worker name with the hostname suffix because we'll have several workers, running in
+ // different pods, using the same worker name otherwise.
+ configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
return configProperties
@@ -154,8 +155,8 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -207,7 +208,9 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties()
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+ // adjust the worker name with the hostname suffix because we'll have several workers, running in
+ // different pods, using the same worker name otherwise.
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
/** To handle Back pressure, Get only configured record for processing */
if (pollRecords > 0) {
@@ -233,8 +236,8 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
val configProps = super.getConfig()
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
- configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
- configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
if (keystore != null) {
configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 21fd84d11..d40067f4e 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
@@ -89,12 +88,13 @@ class KafkaMessageProducerService(
BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
BlueprintMessageUtils.kafkaMetricTag(topic)
).increment()
- log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception)
} else {
- val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
- "partition(${metadata.partition()}) " +
- "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
- log.info(message)
+ log.info(
+ "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}."
+ )
}
}
messageTemplate().send(record, callback)
@@ -104,7 +104,6 @@ class KafkaMessageProducerService(
fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = messageProducerProperties.getConfig()
-
/** Add additional Properties */
if (additionalConfig != null)
configProps.putAll(additionalConfig)
@@ -147,18 +146,4 @@ class KafkaMessageProducerService(
stepData = executionServiceOutput.stepData
}
}
-
- private fun getMessageLogData(message: Any): String {
- return when (message) {
- is ExecutionServiceInput -> {
- val actionIdentifiers = message.actionIdentifiers
- "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
- }
- is ExecutionServiceOutput -> {
- val actionIdentifiers = message.actionIdentifiers
- "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
- }
- else -> "message($message)"
- }
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
index 7431998d9..b4817cd8a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
@@ -17,7 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import kotlin.math.max
class BlueprintMessageUtils {
companion object {
@@ -25,5 +29,27 @@ class BlueprintMessageUtils {
mutableListOf(
Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
)
+
+ /**
+ * get OS hostname's last 5 characters
+ * Used to generate unique client ID.
+ */
+ fun getHostnameSuffix(): String =
+ System.getenv("HOSTNAME").defaultToUUID().let {
+ it.substring(max(0, it.length - 5))
+ }
+
+ fun getMessageLogData(message: Any): String =
+ when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index fb53ff45b..15b73539e 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -212,7 +212,6 @@ open class BlueprintMessageConsumerServiceTest {
@Test
fun testKafkaScramSslAuthConfig() {
-
val expectedConfig = mapOf<String, Any>(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
@@ -220,7 +219,6 @@ open class BlueprintMessageConsumerServiceTest {
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
@@ -251,6 +249,15 @@ open class BlueprintMessageConsumerServiceTest {
"Authentication type doesn't match the expected value"
)
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
expectedConfig.forEach {
assertTrue(
configProps.containsKey(it.key),
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 1490a3311..30021a6de 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -112,7 +112,6 @@ open class BlueprintMessageProducerServiceTest {
ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
- ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
@@ -143,6 +142,15 @@ open class BlueprintMessageProducerServiceTest {
"Authentication type doesn't match the expected value"
)
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
expectedConfig.forEach {
assertTrue(
configProps.containsKey(it.key),
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
index 849a411a6..de9ca2c44 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
@@ -17,8 +17,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
import io.micrometer.core.instrument.Tag
+import io.mockk.every
+import io.mockk.mockkStatic
import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+
import kotlin.test.assertEquals
class BlueprintMessageUtilsTest {
@@ -32,4 +37,46 @@ class BlueprintMessageUtilsTest {
assertEquals(expected, tags)
}
+
+ @Test
+ fun testGetHostnameSuffix() {
+ mockkStatic(System::class)
+ every { System.getenv("HOSTNAME") } returns "qwertyuiop"
+ assertEquals("yuiop", BlueprintMessageUtils.getHostnameSuffix())
+ }
+
+ @Test
+ fun testGetNullHostnameSuffix() {
+ mockkStatic(System::class)
+ every { System.getenv("HOSTNAME") } returns null
+ assertEquals(5, BlueprintMessageUtils.getHostnameSuffix().length)
+ }
+
+ @Test
+ fun testGetMessageLogData() {
+ val input = ExecutionServiceInput().apply {
+ actionIdentifiers = ActionIdentifiers().apply {
+ blueprintName = "bpInput"
+ blueprintVersion = "1.0.0-input"
+ actionName = "bpActionInput"
+ }
+ }
+ val expectedOnInput = "CBA(bpInput/1.0.0-input/bpActionInput)"
+
+ val output = ExecutionServiceInput().apply {
+ actionIdentifiers = ActionIdentifiers().apply {
+ blueprintName = "bpOutput"
+ blueprintVersion = "1.0.0-output"
+ actionName = "bpActionOutput"
+ }
+ }
+ val expectedOnOutput = "CBA(bpOutput/1.0.0-output/bpActionOutput)"
+
+ val otherMessage = "some other message"
+ val expectedOnOtherMessage = "message(some other message)"
+
+ assertEquals(expectedOnInput, BlueprintMessageUtils.getMessageLogData(input))
+ assertEquals(expectedOnOutput, BlueprintMessageUtils.getMessageLogData(output))
+ assertEquals(expectedOnOtherMessage, BlueprintMessageUtils.getMessageLogData(otherMessage))
+ }
}