diff options
7 files changed, 113 insertions, 37 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt index 67dba1f19..3e7db9597 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.StreamsConfig +import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils /** Common Properties **/ abstract class CommonProperties { @@ -51,7 +52,7 @@ abstract class MessageProducerProperties : CommonProperties() /** Basic Auth */ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - var clientId: String? = null + lateinit var clientId: String var acks: String = "all" // strongest producing guarantee var maxBlockMs: Int = 250 // max blocking time in ms to send a message var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) @@ -65,9 +66,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence - if (clientId != null) { - configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! - } + configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}" return configProps } } @@ -87,8 +86,8 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType @@ -132,7 +131,9 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() override fun getConfig(): HashMap<String, Any> { val configProperties = super.getConfig() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + // adjust the worker name with the hostname suffix because we'll have several workers, running in + // different pods, using the same worker name otherwise. + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}" configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee return configProperties @@ -154,8 +155,8 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType @@ -207,7 +208,9 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + // adjust the worker name with the hostname suffix because we'll have several workers, running in + // different pods, using the same worker name otherwise. + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}" /** To handle Back pressure, Get only configured record for processing */ if (pollRecords > 0) { @@ -233,8 +236,8 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer val configProps = super.getConfig() configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword if (keystore != null) { configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 21fd84d11..d40067f4e 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants @@ -89,12 +88,13 @@ class KafkaMessageProducerService( BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER, BlueprintMessageUtils.kafkaMetricTag(topic) ).increment() - log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) + log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception) } else { - val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + - "partition(${metadata.partition()}) " + - "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." - log.info(message) + log.info( + "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + + "partition(${metadata.partition()}) " + + "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}." + ) } } messageTemplate().send(record, callback) @@ -104,7 +104,6 @@ class KafkaMessageProducerService( fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = messageProducerProperties.getConfig() - /** Add additional Properties */ if (additionalConfig != null) configProps.putAll(additionalConfig) @@ -147,18 +146,4 @@ class KafkaMessageProducerService( stepData = executionServiceOutput.stepData } } - - private fun getMessageLogData(message: Any): String { - return when (message) { - is ExecutionServiceInput -> { - val actionIdentifiers = message.actionIdentifiers - "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" - } - is ExecutionServiceOutput -> { - val actionIdentifiers = message.actionIdentifiers - "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" - } - else -> "message($message)" - } - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt index 7431998d9..b4817cd8a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -17,7 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.utils import io.micrometer.core.instrument.Tag +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import kotlin.math.max class BlueprintMessageUtils { companion object { @@ -25,5 +29,27 @@ class BlueprintMessageUtils { mutableListOf( Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) ) + + /** + * get OS hostname's last 5 characters + * Used to generate unique client ID. + */ + fun getHostnameSuffix(): String = + System.getenv("HOSTNAME").defaultToUUID().let { + it.substring(max(0, it.length - 5)) + } + + fun getMessageLogData(message: Any): String = + when (message) { + is ExecutionServiceInput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + is ExecutionServiceOutput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + else -> "message($message)" + } } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index fb53ff45b..15b73539e 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -212,7 +212,6 @@ open class BlueprintMessageConsumerServiceTest { @Test fun testKafkaScramSslAuthConfig() { - val expectedConfig = mapOf<String, Any>( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", ConsumerConfig.GROUP_ID_CONFIG to "sample-group", @@ -220,7 +219,6 @@ open class BlueprintMessageConsumerServiceTest { ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", @@ -251,6 +249,15 @@ open class BlueprintMessageConsumerServiceTest { "Authentication type doesn't match the expected value" ) + assertTrue( + configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG), + "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}" + ) + assertTrue( + configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"), + "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id" + ) + expectedConfig.forEach { assertTrue( configProps.containsKey(it.key), diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 1490a3311..30021a6de 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -112,7 +112,6 @@ open class BlueprintMessageProducerServiceTest { ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, - ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", @@ -143,6 +142,15 @@ open class BlueprintMessageProducerServiceTest { "Authentication type doesn't match the expected value" ) + assertTrue( + configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG), + "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}" + ) + assertTrue( + configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"), + "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id" + ) + expectedConfig.forEach { assertTrue( configProps.containsKey(it.key), diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt index 849a411a6..de9ca2c44 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt @@ -17,8 +17,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.utils import io.micrometer.core.instrument.Tag +import io.mockk.every +import io.mockk.mockkStatic import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants + import kotlin.test.assertEquals class BlueprintMessageUtilsTest { @@ -32,4 +37,46 @@ class BlueprintMessageUtilsTest { assertEquals(expected, tags) } + + @Test + fun testGetHostnameSuffix() { + mockkStatic(System::class) + every { System.getenv("HOSTNAME") } returns "qwertyuiop" + assertEquals("yuiop", BlueprintMessageUtils.getHostnameSuffix()) + } + + @Test + fun testGetNullHostnameSuffix() { + mockkStatic(System::class) + every { System.getenv("HOSTNAME") } returns null + assertEquals(5, BlueprintMessageUtils.getHostnameSuffix().length) + } + + @Test + fun testGetMessageLogData() { + val input = ExecutionServiceInput().apply { + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "bpInput" + blueprintVersion = "1.0.0-input" + actionName = "bpActionInput" + } + } + val expectedOnInput = "CBA(bpInput/1.0.0-input/bpActionInput)" + + val output = ExecutionServiceInput().apply { + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "bpOutput" + blueprintVersion = "1.0.0-output" + actionName = "bpActionOutput" + } + } + val expectedOnOutput = "CBA(bpOutput/1.0.0-output/bpActionOutput)" + + val otherMessage = "some other message" + val expectedOnOtherMessage = "message(some other message)" + + assertEquals(expectedOnInput, BlueprintMessageUtils.getMessageLogData(input)) + assertEquals(expectedOnOutput, BlueprintMessageUtils.getMessageLogData(output)) + assertEquals(expectedOnOtherMessage, BlueprintMessageUtils.getMessageLogData(otherMessage)) + } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt index 661e76b2b..82fd78efe 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt @@ -116,7 +116,7 @@ open class BlueprintProcessingKafkaConsumer( "leaderEpoch(${message.leaderEpoch().get()}) " + "offset(${message.offset()}) " + "key(${message.key()}) " + - "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})" + BlueprintMessageUtils.getMessageLogData(executionServiceInput) ) val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) blueprintMessageProducerService.sendMessage(key, executionServiceOutput) |