aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt219
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt283
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt141
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt)27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt)38
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt)15
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt60
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt85
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt72
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt)18
11 files changed, 794 insertions, 168 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index cc4c7fa4a..c6587c799 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -61,6 +61,10 @@ class MessageLibConstants {
const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+ const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth"
+ const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth"
const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
+ const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
+ const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 005223d9b..ac35fbf2c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -17,49 +17,238 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.StreamsConfig
-/** Producer Properties **/
-open class MessageProducerProperties {
+/** Common Properties **/
+abstract class CommonProperties {
lateinit var type: String
+ lateinit var topic: String
+ lateinit var bootstrapServers: String
+
+ open fun getConfig(): HashMap<String, Any> {
+ val configProps = hashMapOf<String, Any>()
+ configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ return configProps
+ }
}
+/** Message Producer */
+/** Message Producer Properties **/
+abstract class MessageProducerProperties : CommonProperties()
+
+/** Basic Auth */
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
- lateinit var bootstrapServers: String
- var topic: String? = null
+
var clientId: String? = null
// strongest producing guarantee
var acks: String = "all"
var retries: Int = 0
// ensure we don't push duplicates
var enableIdempotence: Boolean = true
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
+ configProps[ProducerConfig.ACKS_CONFIG] = acks
+ configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
+ if (clientId != null) {
+ configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
+ }
+ return configProps
+ }
}
-/** Consumer Properties **/
+/** SSL Auth */
+open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
-open class MessageConsumerProperties {
- lateinit var type: String
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+
+ return configProps
+ }
}
-open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
- lateinit var bootstrapServers: String
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
+/** Consumer */
+abstract class MessageConsumerProperties : CommonProperties()
+/** Kafka Streams */
+/** Streams properties */
+
+/** Basic Auth */
+open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
lateinit var applicationId: String
- lateinit var topic: String
var autoOffsetReset: String = "latest"
var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProperties = super.getConfig()
+ configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+ configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
+ return configProperties
+ }
}
-open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties()
+/** SSL Auth */
+open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
-open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
- lateinit var bootstrapServers: String
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+ return configProps
+ }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
+/** Message Consumer */
+/** Message Consumer Properties **/
+/** Basic Auth */
+open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
lateinit var groupId: String
lateinit var clientId: String
- var topic: String? = null
var autoCommit: Boolean = true
var autoOffsetReset: String = "latest"
var pollMillSec: Long = 1000
var pollRecords: Int = -1
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProperties = super.getConfig()
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+ configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
+ /**
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ */
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+
+ /** To handle Back pressure, Get only configured record for processing */
+ if (pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
+ }
+
+ return configProperties
+ }
}
-open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
+/** SSL Auth */
+open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+ return configProps
+ }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index 88039466d..c659fdb8b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri
BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block)
)
}
+
+ fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
+ )
+ }
+
+ fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
+ )
+ }
}
fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaSslAuthMessageProducerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaScramSslAuthMessageProducerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
-class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
+open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
@@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer
property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence)
}
+open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
+}
+
/** Relationships Templates DSL for Message Consumer */
fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer(
name: String,
@@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
)
}
+ fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
+ )
+ }
+
+ fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
+ )
+ }
+
fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block)
)
}
+
+ fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
+ )
+ }
+
+ fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
+ )
+ }
}
fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaSslAuthMessageConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] =
@@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaStreamsSslAuthConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
-open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+/** KafkaBasicAuthMessageConsumerProperties assignment builder */
+open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
fun bootstrapServers(bootstrapServers: JsonNode) =
- property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers)
+ property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers)
fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive())
fun groupId(groupId: JsonNode) =
- property(KafkaMessageConsumerProperties::groupId, groupId)
+ property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId)
fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive())
fun clientId(clientId: JsonNode) =
- property(KafkaMessageConsumerProperties::clientId, clientId)
+ property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId)
fun topic(topic: String) = topic(topic.asJsonPrimitive())
fun topic(topic: JsonNode) =
- property(KafkaMessageConsumerProperties::topic, topic)
+ property(KafkaBasicAuthMessageConsumerProperties::topic, topic)
fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive())
fun autoCommit(autoCommit: JsonNode) =
- property(KafkaMessageConsumerProperties::autoCommit, autoCommit)
+ property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit)
fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
fun autoOffsetReset(autoOffsetReset: JsonNode) =
- property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
+ property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive())
fun pollMillSec(pollMillSec: JsonNode) =
- property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec)
+ property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec)
fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive())
fun pollRecords(pollRecords: JsonNode) =
- property(KafkaMessageConsumerProperties::pollRecords, pollRecords)
+ property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords)
}
-/** KafkaBasicAuthMessageConsumerProperties assignment builder */
-class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder()
+open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
+}
/** KafkaStreamsConsumerProperties assignment builder */
-open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
fun bootstrapServers(bootstrapServers: JsonNode) =
- property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers)
+ property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers)
fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive())
fun applicationId(applicationId: JsonNode) =
- property(KafkaStreamsConsumerProperties::applicationId, applicationId)
+ property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId)
fun topic(topic: String) = topic(topic.asJsonPrimitive())
fun topic(topic: JsonNode) =
- property(KafkaStreamsConsumerProperties::topic, topic)
+ property(KafkaStreamsBasicAuthConsumerProperties::topic, topic)
fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
fun autoOffsetReset(autoOffsetReset: JsonNode) =
- property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset)
+ property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset)
fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive())
fun processingGuarantee(processingGuarantee: JsonNode) =
- property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee)
+ property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee)
}
-class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder()
+open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 44b50af44..67fbef580 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
@@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
val messageClientProperties = messageProducerProperties(prefix)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageProducerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
}
- else -> {
- throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
}
- }
- }
-
- private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
- BlueprintMessageProducerService {
-
- when (MessageProducerProperties) {
- is KafkaBasicAuthMessageProducerProperties -> {
- return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
}
}
- private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageProducerProperties::class.java
- )
- }
-
/** Consumer Property Lib Service Implementation **/
/** Return Message Consumer Service for [jsonNode] definitions. */
@@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageConsumerProperties::class.java
+ )
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+ )
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
- kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
val type = jsonNode.get("type").textValue()
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
BlueprintMessageConsumerService {
- when (messageConsumerProperties) {
- is KafkaBasicAuthMessageConsumerProperties -> {
- return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+ when (messageConsumerProperties.type) {
+ /** Message Consumer */
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+ )
}
- is KafkaStreamsBasicAuthConsumerProperties -> {
- return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+ /** Stream Consumer */
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
+ )
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
}
}
}
-
- private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java
- )
- }
-
- private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
- )
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index 3415c8d0d..cdcd4197c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -21,24 +21,20 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
-open class KafkaBasicAuthMessageConsumerService(
+open class KafkaMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
) :
BlueprintMessageConsumerService {
- val log = logger(KafkaBasicAuthMessageConsumerService::class)
+ val log = logger(KafkaMessageConsumerService::class)
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
@@ -46,24 +42,7 @@ open class KafkaBasicAuthMessageConsumerService(
var keepGoing = true
fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
- val configProperties = hashMapOf<String, Any>()
- configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
- configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
- configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
- /**
- * earliest: automatically reset the offset to the earliest offset
- * latest: automatically reset the offset to the latest offset
- */
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
-
- /** To handle Back pressure, Get only configured record for processing */
- if (messageConsumerProperties.pollRecords > 0) {
- configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
- }
- // TODO("Security Implementation based on type")
+ val configProperties = messageConsumerProperties.getConfig()
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 8416282af..931f052ed 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -20,28 +20,20 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
-class KafkaBasicAuthMessageProducerService(
- private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties
+class KafkaMessageProducerService(
+ private val messageProducerProperties: MessageProducerProperties
) :
BlueprintMessageProducerService {
- private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
+ private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!!
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
@@ -81,26 +73,16 @@ class KafkaBasicAuthMessageProducerService(
}
fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
- log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
- val configProps = hashMapOf<String, Any>()
- configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
- configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
- configProps[ACKS_CONFIG] = messageProducerProperties.acks
- configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence
- if (messageProducerProperties.clientId != null) {
- configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
- }
- // TODO("Security Implementation based on type")
+ log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+ val configProps = messageProducerProperties.getConfig()
- // Add additional Properties
- if (additionalConfig != null) {
+ /** Add additional Properties */
+ if (additionalConfig != null)
configProps.putAll(additionalConfig)
- }
- if (kafkaProducer == null) {
+ if (kafkaProducer == null)
kafkaProducer = KafkaProducer(configProps)
- }
+
return kafkaProducer!!
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
index 0b353d58b..60f2dfa05 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
@@ -17,27 +17,22 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
-import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.KafkaStreams
-import org.apache.kafka.streams.StreamsConfig
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.util.Properties
-open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) :
+open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) :
BlueprintMessageConsumerService {
- val log = logger(KafkaStreamsBasicAuthConsumerService::class)
+ val log = logger(KafkaStreamsConsumerService::class)
lateinit var kafkaStreams: KafkaStreams
private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties {
val configProperties = Properties()
- configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId
- configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
- configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee
- // TODO("Security Implementation based on type")
+ /** set consumer properties */
+ messageConsumerProperties.getConfig().let { configProperties.putAll(it) }
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index b10e1023b..612a57d23 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -27,17 +27,27 @@ import kotlin.test.assertNotNull
class MessagePropertiesDSLTest {
@Test
- fun testMessageProducerDSL() {
+ fun testScramSslMessageProducerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
acks("all")
retries(3)
enableIdempotence(true)
topic("sample-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -50,27 +60,27 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match")
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
)
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
)
}
@Test
- fun testMessageConsumerDSL() {
+ fun testScramSslAuthMessageConsumerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
groupId("sample-group-id")
@@ -79,15 +89,35 @@ class MessagePropertiesDSLTest {
autoOffsetReset("latest")
pollMillSec(5000)
pollRecords(20)
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
- relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") {
- kafkaStreamsBasicAuth {
+ relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") {
+ kafkaStreamsScramSslAuth {
bootstrapServers("sample-bootstrapServers")
applicationId("sample-application-id")
autoOffsetReset("latest")
processingGuarantee(StreamsConfig.EXACTLY_ONCE)
topic("sample-streaming-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -100,8 +130,8 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
- assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
+ assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 823ba7dee..ac08dc7b7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.MockConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -52,18 +63,30 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
"blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageConsumerServiceTest {
@@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerService() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerWithDynamicFunction() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
+
+ val configProps = messageConsumerProperties.getConfig()
+
+ assertEquals(messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageConsumerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}")
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testKafkaIntegration() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val channel = blueprintMessageConsumerService.subscribe(null)
@@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(100)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index b824189d2..72a47ed56 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -20,12 +20,22 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
@@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import java.util.concurrent.Future
import kotlin.test.Test
+import kotlin.test.assertEquals
import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@@ -43,10 +54,16 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageProducerServiceTest {
@@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Test
- fun testKafkaBasicAuthProducertService() {
+ fun testKafkaScramSslAuthProducerService() {
runBlocking {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
@@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest {
assertTrue(response, "failed to get command response")
}
}
+
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageProducerProperties = bluePrintMessageLibPropertyService
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
+
+ val configProps = messageProducerProperties.getConfig()
+
+ assertEquals(messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageProducerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 1657d70b4..c30ab9b02 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -47,19 +47,27 @@ import kotlin.test.assertNotNull
@TestPropertySource(
properties =
[
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
"blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
]
)
-class KafkaStreamsBasicAuthConsumerServiceTest {
+class KafkaStreamsConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -117,7 +125,7 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(1000)