From 8a2eb4ae98beb70eac4e5fa4bb2e786c6a9513d2 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 13 Apr 2020 16:42:36 -0400 Subject: Secure Kafka Authentication Implementation of kafka secure authentication : - SSL - SASL(SCRAM) & SSL Issue-ID: CCSDK-2313 Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54 Signed-off-by: Julien Fontaine --- .../message/BluePrintMessageLibConfiguration.kt | 4 + .../message/BluePrintMessageLibData.kt | 219 ++++++++++++++-- .../message/MessagePropertiesDSL.kt | 283 +++++++++++++++++++-- .../service/BluePrintMessageLibPropertyService.kt | 141 +++++++--- .../KafkaBasicAuthMessageConsumerService.kt | 171 ------------- .../KafkaBasicAuthMessageProducerService.kt | 106 -------- .../message/service/KafkaMessageConsumerService.kt | 150 +++++++++++ .../message/service/KafkaMessageProducerService.kt | 88 +++++++ .../KafkaStreamsBasicAuthConsumerService.kt | 71 ------ .../message/service/KafkaStreamsConsumerService.kt | 66 +++++ .../message/MessagePropertiesDSLTest.kt | 60 +++-- .../service/BlueprintMessageConsumerServiceTest.kt | 85 ++++++- .../service/BlueprintMessageProducerServiceTest.kt | 72 +++++- .../KafkaStreamsBasicAuthConsumerServiceTest.kt | 137 ---------- .../service/KafkaStreamsConsumerServiceTest.kt | 145 +++++++++++ 15 files changed, 1212 insertions(+), 586 deletions(-) delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt (limited to 'ms/blueprintsprocessor/modules/commons') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index cc4c7fa4a..c6587c799 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -61,6 +61,10 @@ class MessageLibConstants { const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth" + const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth" const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" + const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" + const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 005223d9b..ac35fbf2c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -17,49 +17,238 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.StreamsConfig -/** Producer Properties **/ -open class MessageProducerProperties { +/** Common Properties **/ +abstract class CommonProperties { lateinit var type: String + lateinit var topic: String + lateinit var bootstrapServers: String + + open fun getConfig(): HashMap { + val configProps = hashMapOf() + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + return configProps + } } +/** Message Producer */ +/** Message Producer Properties **/ +abstract class MessageProducerProperties : CommonProperties() + +/** Basic Auth */ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - lateinit var bootstrapServers: String - var topic: String? = null + var clientId: String? = null // strongest producing guarantee var acks: String = "all" var retries: Int = 0 // ensure we don't push duplicates var enableIdempotence: Boolean = true + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence + if (clientId != null) { + configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! + } + return configProps + } } -/** Consumer Properties **/ +/** SSL Auth */ +open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class MessageConsumerProperties { - lateinit var type: String + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + + return configProps + } } -open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Consumer */ +abstract class MessageConsumerProperties : CommonProperties() +/** Kafka Streams */ +/** Streams properties */ + +/** Basic Auth */ +open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { lateinit var applicationId: String - lateinit var topic: String var autoOffsetReset: String = "latest" var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee + return configProperties + } } -open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties() +/** SSL Auth */ +open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class KafkaMessageConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Message Consumer */ +/** Message Consumer Properties **/ +/** Basic Auth */ +open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { lateinit var groupId: String lateinit var clientId: String - var topic: String? = null var autoCommit: Boolean = true var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + + /** To handle Back pressure, Get only configured record for processing */ + if (pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords + } + + return configProperties + } } -open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() +/** SSL Auth */ +open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index 88039466d..c659fdb8b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block) ) } + + fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { +open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) @@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence) } +open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword) +} + /** Relationships Templates DSL for Message Consumer */ fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer( name: String, @@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri ) } + fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) + ) + } + fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( BluePrintConstants.PROPERTY_CONNECTION_CONFIG, BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block) ) } + + fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) + ) + } + + fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] = @@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa return assignments.asJsonType() } +fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +/** KafkaBasicAuthMessageConsumerProperties assignment builder */ +open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers) fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive()) fun groupId(groupId: JsonNode) = - property(KafkaMessageConsumerProperties::groupId, groupId) + property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId) fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive()) fun clientId(clientId: JsonNode) = - property(KafkaMessageConsumerProperties::clientId, clientId) + property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaMessageConsumerProperties::topic, topic) + property(KafkaBasicAuthMessageConsumerProperties::topic, topic) fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive()) fun autoCommit(autoCommit: JsonNode) = - property(KafkaMessageConsumerProperties::autoCommit, autoCommit) + property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset) fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive()) fun pollMillSec(pollMillSec: JsonNode) = - property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec) + property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec) fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive()) fun pollRecords(pollRecords: JsonNode) = - property(KafkaMessageConsumerProperties::pollRecords, pollRecords) + property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords) } -/** KafkaBasicAuthMessageConsumerProperties assignment builder */ -class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder() +open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword) +} /** KafkaStreamsConsumerProperties assignment builder */ -open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers) fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive()) fun applicationId(applicationId: JsonNode) = - property(KafkaStreamsConsumerProperties::applicationId, applicationId) + property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaStreamsConsumerProperties::topic, topic) + property(KafkaStreamsBasicAuthConsumerProperties::topic, topic) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset) fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive()) fun processingGuarantee(processingGuarantee: JsonNode) = - property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee) + property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee) } -class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder() +open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword) +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 44b50af44..67fbef580 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties @@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun messageProducerProperties(prefix: String): MessageProducerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageProducerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageProducerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! } - else -> { - throw BluePrintProcessorException("Message adaptor($type) is not supported") + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!! } - } - } - - private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties): - BlueprintMessageProducerService { - - when (MessageProducerProperties) { - is KafkaBasicAuthMessageProducerProperties -> { - return KafkaBasicAuthMessageProducerService(MessageProducerProperties) + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("Message adaptor($type) is not supported") } } } - private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageProducerProperties::class.java - ) - } - /** Consumer Property Lib Service Implementation **/ /** Return Message Consumer Service for [jsonNode] definitions. */ @@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(prefix: String): MessageConsumerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageConsumerProperties::class.java + ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageConsumerProperties::class.java + ) + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - kafkaStreamsBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsSslAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { val type = jsonNode.get("type").textValue() return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): BlueprintMessageConsumerService { - when (messageConsumerProperties) { - is KafkaBasicAuthMessageConsumerProperties -> { - return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) + when (messageConsumerProperties.type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + ) } - is KafkaStreamsBasicAuthConsumerProperties -> { - return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties) + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties + ) } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") } } } - - private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java - ) - } - - private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java - ) - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt deleted file mode 100644 index 3415c8d0d..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.nio.charset.Charset -import java.time.Duration -import kotlin.concurrent.thread - -open class KafkaBasicAuthMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties -) : - BlueprintMessageConsumerService { - - val log = logger(KafkaBasicAuthMessageConsumerService::class) - val channel = Channel() - var kafkaConsumer: Consumer? = null - - @Volatile - var keepGoing = true - - fun kafkaConsumer(additionalConfig: Map? = null): Consumer { - val configProperties = hashMapOf() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit - /** - * earliest: automatically reset the offset to the earliest offset - * latest: automatically reset the offset to the latest offset - */ - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId - - /** To handle Back pressure, Get only configured record for processing */ - if (messageConsumerProperties.pollRecords > 0) { - configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords - } - // TODO("Security Implementation based on type") - /** add or override already set properties */ - additionalConfig?.let { configProperties.putAll(it) } - /** Create Kafka consumer */ - return KafkaConsumer(configProperties) - } - - override suspend fun subscribe(additionalConfig: Map?): Channel { - /** get to topic names */ - val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } - check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } - return subscribe(consumerTopic, additionalConfig) - } - - override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { - /** Create Kafka consumer */ - kafkaConsumer = kafkaConsumer(additionalConfig) - - checkNotNull(kafkaConsumer) { - "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" - } - - kafkaConsumer!!.subscribe(topics) - log.info("Successfully consumed topic($topics)") - - thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { - keepGoing = true - kafkaConsumer!!.use { kc -> - while (keepGoing) { - val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.trace("Consumed Records : ${consumerRecords.count()}") - runBlocking { - consumerRecords?.forEach { consumerRecord -> - /** execute the command block */ - consumerRecord.value()?.let { - launch { - if (!channel.isClosedForSend) { - channel.send(String(it, Charset.defaultCharset())) - } else { - log.error("Channel is closed to receive message") - } - } - } - } - } - } - log.info("message listener shutting down.....") - } - } - return channel - } - - override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { - /** get to topic names */ - val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } - check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } - return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) - } - - override suspend fun consume( - topics: List, - additionalConfig: Map?, - consumerFunction: ConsumerFunction - ) { - - val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction - - /** Create Kafka consumer */ - kafkaConsumer = kafkaConsumer(additionalConfig) - - checkNotNull(kafkaConsumer) { - "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" - } - - kafkaConsumer!!.subscribe(topics) - log.info("Successfully consumed topic($topics)") - - thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { - keepGoing = true - kafkaConsumer!!.use { kc -> - while (keepGoing) { - val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.trace("Consumed Records : ${consumerRecords.count()}") - runBlocking { - /** Execute dynamic consumer Block substitution */ - kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) - } - } - log.info("message listener shutting down.....") - } - } - } - - override suspend fun shutDown() { - /** stop the polling loop */ - keepGoing = false - /** Close the Channel */ - channel.cancel() - /** TO shutdown gracefully, need to wait for the maximum poll time */ - delay(messageConsumerProperties.pollMillSec) - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt deleted file mode 100644 index 8416282af..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import org.apache.commons.lang.builder.ToStringBuilder -import org.apache.kafka.clients.producer.Callback -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID -import org.slf4j.LoggerFactory -import java.nio.charset.Charset - -class KafkaBasicAuthMessageProducerService( - private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties -) : - BlueprintMessageProducerService { - - private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - - private var kafkaProducer: KafkaProducer? = null - - private val messageLoggerService = MessageLoggerService() - - override suspend fun sendMessageNB(message: Any): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message) - } - - override suspend fun sendMessageNB(message: Any, headers: MutableMap?): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message, headers) - } - - override suspend fun sendMessageNB( - topic: String, - message: Any, - headers: MutableMap? - ): Boolean { - val byteArrayMessage = when (message) { - is String -> message.toByteArray(Charset.defaultCharset()) - else -> message.asJsonString().toByteArray(Charset.defaultCharset()) - } - - val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) - val recordHeaders = record.headers() - messageLoggerService.messageProducing(recordHeaders) - headers?.let { - headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } - } - val callback = Callback { metadata, exception -> - log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") - } - messageTemplate().send(record, callback) - return true - } - - fun messageTemplate(additionalConfig: Map? = null): KafkaProducer { - log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") - val configProps = hashMapOf() - configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers - configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java - configProps[ACKS_CONFIG] = messageProducerProperties.acks - configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence - if (messageProducerProperties.clientId != null) { - configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! - } - // TODO("Security Implementation based on type") - - // Add additional Properties - if (additionalConfig != null) { - configProps.putAll(additionalConfig) - } - - if (kafkaProducer == null) { - kafkaProducer = KafkaProducer(configProps) - } - return kafkaProducer!! - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt new file mode 100644 index 000000000..cdcd4197c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -0,0 +1,150 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset +import java.time.Duration +import kotlin.concurrent.thread + +open class KafkaMessageConsumerService( + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties +) : + BlueprintMessageConsumerService { + + val log = logger(KafkaMessageConsumerService::class) + val channel = Channel() + var kafkaConsumer: Consumer? = null + + @Volatile + var keepGoing = true + + fun kafkaConsumer(additionalConfig: Map? = null): Consumer { + val configProperties = messageConsumerProperties.getConfig() + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return KafkaConsumer(configProperties) + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return subscribe(consumerTopic, additionalConfig) + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + consumerRecords?.forEach { consumerRecord -> + /** execute the command block */ + consumerRecord.value()?.let { + launch { + if (!channel.isClosedForSend) { + channel.send(String(it, Charset.defaultCharset())) + } else { + log.error("Channel is closed to receive message") + } + } + } + } + } + } + log.info("message listener shutting down.....") + } + } + return channel + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume( + topics: List, + additionalConfig: Map?, + consumerFunction: ConsumerFunction + ) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + + override suspend fun shutDown() { + /** stop the polling loop */ + keepGoing = false + /** Close the Channel */ + channel.cancel() + /** TO shutdown gracefully, need to wait for the maximum poll time */ + delay(messageConsumerProperties.pollMillSec) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt new file mode 100644 index 000000000..931f052ed --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.slf4j.LoggerFactory +import java.nio.charset.Charset + +class KafkaMessageProducerService( + private val messageProducerProperties: MessageProducerProperties +) : + BlueprintMessageProducerService { + + private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!! + + private var kafkaProducer: KafkaProducer? = null + + private val messageLoggerService = MessageLoggerService() + + override suspend fun sendMessageNB(message: Any): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message) + } + + override suspend fun sendMessageNB(message: Any, headers: MutableMap?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } + + override suspend fun sendMessageNB( + topic: String, + message: Any, + headers: MutableMap? + ): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } + + val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) + headers?.let { + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") + } + messageTemplate().send(record, callback) + return true + } + + fun messageTemplate(additionalConfig: Map? = null): KafkaProducer { + log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + val configProps = messageProducerProperties.getConfig() + + /** Add additional Properties */ + if (additionalConfig != null) + configProps.putAll(additionalConfig) + + if (kafkaProducer == null) + kafkaProducer = KafkaProducer(configProps) + + return kafkaProducer!! + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt deleted file mode 100644 index 0b353d58b..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.channels.Channel -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.util.Properties - -open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) : - BlueprintMessageConsumerService { - - val log = logger(KafkaStreamsBasicAuthConsumerService::class) - lateinit var kafkaStreams: KafkaStreams - - private fun streamsConfig(additionalConfig: Map? = null): Properties { - val configProperties = Properties() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId - configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee - // TODO("Security Implementation based on type") - /** add or override already set properties */ - additionalConfig?.let { configProperties.putAll(it) } - /** Create Kafka consumer */ - return configProperties - } - - override suspend fun subscribe(additionalConfig: Map?): Channel { - throw BluePrintProcessorException("not implemented") - } - - override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { - throw BluePrintProcessorException("not implemented") - } - - override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { - val streamsConfig = streamsConfig(additionalConfig) - val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction - val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) - log.info("Kafka streams topology : ${topology.describe()}") - kafkaStreams = KafkaStreams(topology, streamsConfig) - kafkaStreams.cleanUp() - kafkaStreams.start() - kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } - } - - override suspend fun shutDown() { - if (kafkaStreams != null) { - kafkaStreams.close() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt new file mode 100644 index 000000000..60f2dfa05 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -0,0 +1,66 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import org.apache.kafka.streams.KafkaStreams +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.Properties + +open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) : + BlueprintMessageConsumerService { + + val log = logger(KafkaStreamsConsumerService::class) + lateinit var kafkaStreams: KafkaStreams + + private fun streamsConfig(additionalConfig: Map? = null): Properties { + val configProperties = Properties() + /** set consumer properties */ + messageConsumerProperties.getConfig().let { configProperties.putAll(it) } + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return configProperties + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + val streamsConfig = streamsConfig(additionalConfig) + val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") + kafkaStreams = KafkaStreams(topology, streamsConfig) + kafkaStreams.cleanUp() + kafkaStreams.start() + kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } + } + + override suspend fun shutDown() { + if (kafkaStreams != null) { + kafkaStreams.close() + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index b10e1023b..612a57d23 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -27,17 +27,27 @@ import kotlin.test.assertNotNull class MessagePropertiesDSLTest { @Test - fun testMessageProducerDSL() { + fun testScramSslMessageProducerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") { - kafkaBasicAuth { + relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") acks("all") retries(3) enableIdempotence(true) topic("sample-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -50,27 +60,27 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match") assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" ) assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" ) } @Test - fun testMessageConsumerDSL() { + fun testScramSslAuthMessageConsumerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") { - kafkaBasicAuth { + relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") groupId("sample-group-id") @@ -79,15 +89,35 @@ class MessagePropertiesDSLTest { autoOffsetReset("latest") pollMillSec(5000) pollRecords(20) + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } - relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") { - kafkaStreamsBasicAuth { + relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") { + kafkaStreamsScramSslAuth { bootstrapServers("sample-bootstrapServers") applicationId("sample-application-id") autoOffsetReset("latest") processingGuarantee(StreamsConfig.EXACTLY_ONCE) topic("sample-streaming-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -100,8 +130,8 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") - assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") + assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 823ba7dee..ac08dc7b7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.MockConsumer import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.StringDeserializer import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -52,18 +63,30 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", "blueprintsprocessor.messageconsumer.sample.topic=default-topic", "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageConsumerServiceTest { @@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerService() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerWithDynamicFunction() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaScramSslAuthConfig() { + + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ConsumerConfig.GROUP_ID_CONFIG to "sample-group", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample") + + val configProps = messageConsumerProperties.getConfig() + + assertEquals(messageConsumerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageConsumerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}") + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test fun testKafkaIntegration() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val channel = blueprintMessageConsumerService.subscribe(null) @@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService launch { repeat(5) { delay(100) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index b824189d2..72a47ed56 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,12 +20,22 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration @@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import java.util.concurrent.Future import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue @RunWith(SpringRunner::class) @@ -43,10 +54,16 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageProducerServiceTest { @@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Test - fun testKafkaBasicAuthProducertService() { + fun testKafkaScramSslAuthProducerService() { runBlocking { val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService val mockKafkaTemplate = mockk>() @@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest { assertTrue(response, "failed to get command response") } } + + @Test + fun testKafkaScramSslAuthConfig() { + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageProducerProperties = bluePrintMessageLibPropertyService + .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample") + + val configProps = messageProducerProperties.getConfig() + + assertEquals(messageProducerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageProducerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt deleted file mode 100644 index 1657d70b4..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.apache.kafka.streams.processor.Processor -import org.apache.kafka.streams.processor.ProcessorSupplier -import org.apache.kafka.streams.state.Stores -import org.junit.Test -import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.SpringRunner -import kotlin.test.assertNotNull - -@RunWith(SpringRunner::class) -@DirtiesContext -@ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] -) -@TestPropertySource( - properties = - [ - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" - - ] -) -class KafkaStreamsBasicAuthConsumerServiceTest { - - @Autowired - lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService - - @Test - fun testProperties() { - val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") - assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") - } - - /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - // @Test - fun testKafkaStreamingMessageConsumer() { - runBlocking { - val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") - - // Dynamic Consumer Function to create Topology - val consumerFunction = object : KafkaStreamConsumerFunction { - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") - topology.addSource("Source", *topics.toTypedArray()) - // Processor Supplier - val firstProcessorSupplier = object : ProcessorSupplier { - override fun get(): Processor { - return FirstProcessor() - } - } - val changelogConfig: MutableMap = hashMapOf() - changelogConfig.put("min.insync.replicas", "1") - - // Store Buolder - val countStoreSupplier = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("PriorityMessageState"), - Serdes.String(), - PriorityMessageSerde() - ) - .withLoggingEnabled(changelogConfig) - - topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") - topology.addStateStore(countStoreSupplier, "FirstProcessor") - topology.addSink( - "SINK", "default-stream-topic-out", Serdes.String().serializer(), - PriorityMessageSerde().serializer(), "FirstProcessor" - ) - return topology - } - } - - /** Send message with every 1 sec */ - val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - launch { - repeat(5) { - delay(1000) - val headers: MutableMap = hashMapOf() - headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB( - message = "this is my message($it)", - headers = headers - ) - } - } - streamingConsumerService.consume(null, consumerFunction) - delay(10000) - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt new file mode 100644 index 000000000..c30ab9b02 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -0,0 +1,145 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.apache.kafka.streams.state.Stores +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", + + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword" + + ] +) +class KafkaStreamsConsumerServiceTest { + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testProperties() { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + // @Test + fun testKafkaStreamingMessageConsumer() { + runBlocking { + val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + + // Dynamic Consumer Function to create Topology + val consumerFunction = object : KafkaStreamConsumerFunction { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + topology.addSource("Source", *topics.toTypedArray()) + // Processor Supplier + val firstProcessorSupplier = object : ProcessorSupplier { + override fun get(): Processor { + return FirstProcessor() + } + } + val changelogConfig: MutableMap = hashMapOf() + changelogConfig.put("min.insync.replicas", "1") + + // Store Buolder + val countStoreSupplier = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde() + ) + .withLoggingEnabled(changelogConfig) + + topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") + topology.addStateStore(countStoreSupplier, "FirstProcessor") + topology.addSink( + "SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor" + ) + return topology + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService + launch { + repeat(5) { + delay(1000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) + } + } + streamingConsumerService.consume(null, consumerFunction) + delay(10000) + streamingConsumerService.shutDown() + } + } +} -- cgit 1.2.3-korg