diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-04-13 16:42:36 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-04-21 13:57:31 -0400 |
commit | 8a2eb4ae98beb70eac4e5fa4bb2e786c6a9513d2 (patch) | |
tree | d953dcfa97e409332553d58759beb34ae3efa7a7 /ms/blueprintsprocessor/modules/commons/message-lib/src | |
parent | 6e7cbbbc4668c9d37d44bab6625ab7275043eb72 (diff) |
Secure Kafka Authentication
Implementation of kafka secure authentication :
- SSL
- SASL(SCRAM) & SSL
Issue-ID: CCSDK-2313
Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
11 files changed, 794 insertions, 168 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index cc4c7fa4a..c6587c799 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -61,6 +61,10 @@ class MessageLibConstants { const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth" + const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth" const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" + const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" + const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 005223d9b..ac35fbf2c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -17,49 +17,238 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.StreamsConfig -/** Producer Properties **/ -open class MessageProducerProperties { +/** Common Properties **/ +abstract class CommonProperties { lateinit var type: String + lateinit var topic: String + lateinit var bootstrapServers: String + + open fun getConfig(): HashMap<String, Any> { + val configProps = hashMapOf<String, Any>() + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + return configProps + } } +/** Message Producer */ +/** Message Producer Properties **/ +abstract class MessageProducerProperties : CommonProperties() + +/** Basic Auth */ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - lateinit var bootstrapServers: String - var topic: String? = null + var clientId: String? = null // strongest producing guarantee var acks: String = "all" var retries: Int = 0 // ensure we don't push duplicates var enableIdempotence: Boolean = true + + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence + if (clientId != null) { + configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! + } + return configProps + } } -/** Consumer Properties **/ +/** SSL Auth */ +open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class MessageConsumerProperties { - lateinit var type: String + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + + return configProps + } } -open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Consumer */ +abstract class MessageConsumerProperties : CommonProperties() +/** Kafka Streams */ +/** Streams properties */ + +/** Basic Auth */ +open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { lateinit var applicationId: String - lateinit var topic: String var autoOffsetReset: String = "latest" var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE + + override fun getConfig(): HashMap<String, Any> { + val configProperties = super.getConfig() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee + return configProperties + } } -open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties() +/** SSL Auth */ +open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class KafkaMessageConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Message Consumer */ +/** Message Consumer Properties **/ +/** Basic Auth */ +open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { lateinit var groupId: String lateinit var clientId: String - var topic: String? = null var autoCommit: Boolean = true var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 + + override fun getConfig(): HashMap<String, Any> { + val configProperties = super.getConfig() + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + + /** To handle Back pressure, Get only configured record for processing */ + if (pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords + } + + return configProperties + } } -open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() +/** SSL Auth */ +open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" + + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap<String, Any> { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index 88039466d..c659fdb8b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block) ) } + + fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { +open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) @@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence) } +open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword) +} + /** Relationships Templates DSL for Message Consumer */ fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer( name: String, @@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri ) } + fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) + ) + } + fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( BluePrintConstants.PROPERTY_CONNECTION_CONFIG, BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block) ) } + + fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) + ) + } + + fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] = @@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa return assignments.asJsonType() } +fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +/** KafkaBasicAuthMessageConsumerProperties assignment builder */ +open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers) fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive()) fun groupId(groupId: JsonNode) = - property(KafkaMessageConsumerProperties::groupId, groupId) + property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId) fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive()) fun clientId(clientId: JsonNode) = - property(KafkaMessageConsumerProperties::clientId, clientId) + property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaMessageConsumerProperties::topic, topic) + property(KafkaBasicAuthMessageConsumerProperties::topic, topic) fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive()) fun autoCommit(autoCommit: JsonNode) = - property(KafkaMessageConsumerProperties::autoCommit, autoCommit) + property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset) fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive()) fun pollMillSec(pollMillSec: JsonNode) = - property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec) + property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec) fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive()) fun pollRecords(pollRecords: JsonNode) = - property(KafkaMessageConsumerProperties::pollRecords, pollRecords) + property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords) } -/** KafkaBasicAuthMessageConsumerProperties assignment builder */ -class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder() +open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword) +} /** KafkaStreamsConsumerProperties assignment builder */ -open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers) fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive()) fun applicationId(applicationId: JsonNode) = - property(KafkaStreamsConsumerProperties::applicationId, applicationId) + property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaStreamsConsumerProperties::topic, topic) + property(KafkaStreamsBasicAuthConsumerProperties::topic, topic) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset) fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive()) fun processingGuarantee(processingGuarantee: JsonNode) = - property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee) + property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee) } -class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder() +open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword) +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 44b50af44..67fbef580 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties @@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun messageProducerProperties(prefix: String): MessageProducerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageProducerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageProducerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! } - else -> { - throw BluePrintProcessorException("Message adaptor($type) is not supported") + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!! } - } - } - - private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties): - BlueprintMessageProducerService { - - when (MessageProducerProperties) { - is KafkaBasicAuthMessageProducerProperties -> { - return KafkaBasicAuthMessageProducerService(MessageProducerProperties) + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("Message adaptor($type) is not supported") } } } - private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageProducerProperties::class.java - ) - } - /** Consumer Property Lib Service Implementation **/ /** Return Message Consumer Service for [jsonNode] definitions. */ @@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(prefix: String): MessageConsumerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageConsumerProperties::class.java + ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageConsumerProperties::class.java + ) + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - kafkaStreamsBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsSslAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { val type = jsonNode.get("type").textValue() return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): BlueprintMessageConsumerService { - when (messageConsumerProperties) { - is KafkaBasicAuthMessageConsumerProperties -> { - return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) + when (messageConsumerProperties.type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + ) } - is KafkaStreamsBasicAuthConsumerProperties -> { - return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties) + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties + ) } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") } } } - - private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java - ) - } - - private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java - ) - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index 3415c8d0d..cdcd4197c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -21,24 +21,20 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -open class KafkaBasicAuthMessageConsumerService( +open class KafkaMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties ) : BlueprintMessageConsumerService { - val log = logger(KafkaBasicAuthMessageConsumerService::class) + val log = logger(KafkaMessageConsumerService::class) val channel = Channel<String>() var kafkaConsumer: Consumer<String, ByteArray>? = null @@ -46,24 +42,7 @@ open class KafkaBasicAuthMessageConsumerService( var keepGoing = true fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { - val configProperties = hashMapOf<String, Any>() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit - /** - * earliest: automatically reset the offset to the earliest offset - * latest: automatically reset the offset to the latest offset - */ - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId - - /** To handle Back pressure, Get only configured record for processing */ - if (messageConsumerProperties.pollRecords > 0) { - configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords - } - // TODO("Security Implementation based on type") + val configProperties = messageConsumerProperties.getConfig() /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } /** Create Kafka consumer */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 8416282af..931f052ed 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -20,28 +20,20 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory import java.nio.charset.Charset -class KafkaBasicAuthMessageProducerService( - private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties +class KafkaMessageProducerService( + private val messageProducerProperties: MessageProducerProperties ) : BlueprintMessageProducerService { - private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! + private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!! private var kafkaProducer: KafkaProducer<String, ByteArray>? = null @@ -81,26 +73,16 @@ class KafkaBasicAuthMessageProducerService( } fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { - log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") - val configProps = hashMapOf<String, Any>() - configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers - configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java - configProps[ACKS_CONFIG] = messageProducerProperties.acks - configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence - if (messageProducerProperties.clientId != null) { - configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! - } - // TODO("Security Implementation based on type") + log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + val configProps = messageProducerProperties.getConfig() - // Add additional Properties - if (additionalConfig != null) { + /** Add additional Properties */ + if (additionalConfig != null) configProps.putAll(additionalConfig) - } - if (kafkaProducer == null) { + if (kafkaProducer == null) kafkaProducer = KafkaProducer(configProps) - } + return kafkaProducer!! } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt index 0b353d58b..60f2dfa05 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -17,27 +17,22 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.util.Properties -open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) : +open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) : BlueprintMessageConsumerService { - val log = logger(KafkaStreamsBasicAuthConsumerService::class) + val log = logger(KafkaStreamsConsumerService::class) lateinit var kafkaStreams: KafkaStreams private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties { val configProperties = Properties() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId - configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee - // TODO("Security Implementation based on type") + /** set consumer properties */ + messageConsumerProperties.getConfig().let { configProperties.putAll(it) } /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } /** Create Kafka consumer */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index b10e1023b..612a57d23 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -27,17 +27,27 @@ import kotlin.test.assertNotNull class MessagePropertiesDSLTest { @Test - fun testMessageProducerDSL() { + fun testScramSslMessageProducerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") { - kafkaBasicAuth { + relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") acks("all") retries(3) enableIdempotence(true) topic("sample-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -50,27 +60,27 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match") assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" ) assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" ) } @Test - fun testMessageConsumerDSL() { + fun testScramSslAuthMessageConsumerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") { - kafkaBasicAuth { + relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") groupId("sample-group-id") @@ -79,15 +89,35 @@ class MessagePropertiesDSLTest { autoOffsetReset("latest") pollMillSec(5000) pollRecords(20) + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } - relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") { - kafkaStreamsBasicAuth { + relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") { + kafkaStreamsScramSslAuth { bootstrapServers("sample-bootstrapServers") applicationId("sample-application-id") autoOffsetReset("latest") processingGuarantee(StreamsConfig.EXACTLY_ONCE) topic("sample-streaming-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -100,8 +130,8 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") - assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") + assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 823ba7dee..ac08dc7b7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.MockConsumer import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.StringDeserializer import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -52,18 +63,30 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", "blueprintsprocessor.messageconsumer.sample.topic=default-topic", "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageConsumerServiceTest { @@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerService() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerWithDynamicFunction() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaScramSslAuthConfig() { + + val expectedConfig = mapOf<String, Any>( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ConsumerConfig.GROUP_ID_CONFIG to "sample-group", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample") + + val configProps = messageConsumerProperties.getConfig() + + assertEquals(messageConsumerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageConsumerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}") + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test fun testKafkaIntegration() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val channel = blueprintMessageConsumerService.subscribe(null) @@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService launch { repeat(5) { delay(100) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index b824189d2..72a47ed56 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,12 +20,22 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration @@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import java.util.concurrent.Future import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue @RunWith(SpringRunner::class) @@ -43,10 +54,16 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageProducerServiceTest { @@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Test - fun testKafkaBasicAuthProducertService() { + fun testKafkaScramSslAuthProducerService() { runBlocking { val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() @@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest { assertTrue(response, "failed to get command response") } } + + @Test + fun testKafkaScramSslAuthConfig() { + val expectedConfig = mapOf<String, Any>( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageProducerProperties = bluePrintMessageLibPropertyService + .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample") + + val configProps = messageProducerProperties.getConfig() + + assertEquals(messageProducerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageProducerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index 1657d70b4..c30ab9b02 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -47,19 +47,27 @@ import kotlin.test.assertNotNull @TestPropertySource( properties = [ - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth", "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword" ] ) -class KafkaStreamsBasicAuthConsumerServiceTest { +class KafkaStreamsConsumerServiceTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -117,7 +125,7 @@ class KafkaStreamsBasicAuthConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService launch { repeat(5) { delay(1000) |