From 8a2eb4ae98beb70eac4e5fa4bb2e786c6a9513d2 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 13 Apr 2020 16:42:36 -0400 Subject: Secure Kafka Authentication Implementation of kafka secure authentication : - SSL - SASL(SCRAM) & SSL Issue-ID: CCSDK-2313 Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54 Signed-off-by: Julien Fontaine --- .../message/BluePrintMessageLibConfiguration.kt | 4 + .../message/BluePrintMessageLibData.kt | 219 ++++++++++++++-- .../message/MessagePropertiesDSL.kt | 283 +++++++++++++++++++-- .../service/BluePrintMessageLibPropertyService.kt | 141 +++++++--- .../KafkaBasicAuthMessageConsumerService.kt | 171 ------------- .../KafkaBasicAuthMessageProducerService.kt | 106 -------- .../message/service/KafkaMessageConsumerService.kt | 150 +++++++++++ .../message/service/KafkaMessageProducerService.kt | 88 +++++++ .../KafkaStreamsBasicAuthConsumerService.kt | 71 ------ .../message/service/KafkaStreamsConsumerService.kt | 66 +++++ 10 files changed, 876 insertions(+), 423 deletions(-) delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index cc4c7fa4a..c6587c799 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -61,6 +61,10 @@ class MessageLibConstants { const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth" + const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth" const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" + const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" + const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 005223d9b..ac35fbf2c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -17,49 +17,238 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.StreamsConfig -/** Producer Properties **/ -open class MessageProducerProperties { +/** Common Properties **/ +abstract class CommonProperties { lateinit var type: String + lateinit var topic: String + lateinit var bootstrapServers: String + + open fun getConfig(): HashMap { + val configProps = hashMapOf() + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + return configProps + } } +/** Message Producer */ +/** Message Producer Properties **/ +abstract class MessageProducerProperties : CommonProperties() + +/** Basic Auth */ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - lateinit var bootstrapServers: String - var topic: String? = null + var clientId: String? = null // strongest producing guarantee var acks: String = "all" var retries: Int = 0 // ensure we don't push duplicates var enableIdempotence: Boolean = true + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence + if (clientId != null) { + configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! + } + return configProps + } } -/** Consumer Properties **/ +/** SSL Auth */ +open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class MessageConsumerProperties { - lateinit var type: String + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + + return configProps + } } -open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Consumer */ +abstract class MessageConsumerProperties : CommonProperties() +/** Kafka Streams */ +/** Streams properties */ + +/** Basic Auth */ +open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { lateinit var applicationId: String - lateinit var topic: String var autoOffsetReset: String = "latest" var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee + return configProperties + } } -open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties() +/** SSL Auth */ +open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" -open class KafkaMessageConsumerProperties : MessageConsumerProperties() { - lateinit var bootstrapServers: String + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Message Consumer */ +/** Message Consumer Properties **/ +/** Basic Auth */ +open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { lateinit var groupId: String lateinit var clientId: String - var topic: String? = null var autoCommit: Boolean = true var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + + /** To handle Back pressure, Get only configured record for processing */ + if (pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords + } + + return configProperties + } } -open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() +/** SSL Auth */ +open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = "" + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index 88039466d..c659fdb8b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block) ) } + + fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { +open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) @@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence) } +open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword) +} + /** Relationships Templates DSL for Message Consumer */ fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer( name: String, @@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri ) } + fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) + ) + } + + fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) + ) + } + fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( BluePrintConstants.PROPERTY_CONNECTION_CONFIG, BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block) ) } + + fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) + ) + } + + fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) + ) + } } fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { @@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth return assignments.asJsonType() } +fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] = @@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa return assignments.asJsonType() } +fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() -open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +/** KafkaBasicAuthMessageConsumerProperties assignment builder */ +open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers) fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive()) fun groupId(groupId: JsonNode) = - property(KafkaMessageConsumerProperties::groupId, groupId) + property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId) fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive()) fun clientId(clientId: JsonNode) = - property(KafkaMessageConsumerProperties::clientId, clientId) + property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaMessageConsumerProperties::topic, topic) + property(KafkaBasicAuthMessageConsumerProperties::topic, topic) fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive()) fun autoCommit(autoCommit: JsonNode) = - property(KafkaMessageConsumerProperties::autoCommit, autoCommit) + property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset) fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive()) fun pollMillSec(pollMillSec: JsonNode) = - property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec) + property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec) fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive()) fun pollRecords(pollRecords: JsonNode) = - property(KafkaMessageConsumerProperties::pollRecords, pollRecords) + property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords) } -/** KafkaBasicAuthMessageConsumerProperties assignment builder */ -class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder() +open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword) +} /** KafkaStreamsConsumerProperties assignment builder */ -open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { +open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) fun bootstrapServers(bootstrapServers: JsonNode) = - property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers) + property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers) fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive()) fun applicationId(applicationId: JsonNode) = - property(KafkaStreamsConsumerProperties::applicationId, applicationId) + property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId) fun topic(topic: String) = topic(topic.asJsonPrimitive()) fun topic(topic: JsonNode) = - property(KafkaStreamsConsumerProperties::topic, topic) + property(KafkaStreamsBasicAuthConsumerProperties::topic, topic) fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) fun autoOffsetReset(autoOffsetReset: JsonNode) = - property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset) + property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset) fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive()) fun processingGuarantee(processingGuarantee: JsonNode) = - property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee) + property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee) } -class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder() +open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) + + fun truststore(truststore: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore) + + fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) + + fun truststorePassword(truststorePassword: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword) + + fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) + + fun truststoreType(truststoreType: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType) + + fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) + + fun keystore(keystore: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + + fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) + + fun keystorePassword(keystorePassword: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + + fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) + + fun keystoreType(keystoreType: JsonNode) = + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + + fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = + property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) +} + +class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) + + fun saslMechanism(saslMechanism: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism) + + fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) + + fun scramUsername(scramUsername: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername) + + fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) + + fun scramPassword(scramPassword: JsonNode) = + property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword) +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 44b50af44..67fbef580 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties @@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { val messageClientProperties = messageProducerProperties(jsonNode) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" val messageClientProperties = messageProducerProperties(prefix) - return blueprintMessageProducerService(messageClientProperties) + return KafkaMessageProducerService(messageClientProperties) } fun messageProducerProperties(prefix: String): MessageProducerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageProducerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageProducerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! } - else -> { - throw BluePrintProcessorException("Message adaptor($type) is not supported") + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!! } - } - } - - private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties): - BlueprintMessageProducerService { - - when (MessageProducerProperties) { - is KafkaBasicAuthMessageProducerProperties -> { - return KafkaBasicAuthMessageProducerService(MessageProducerProperties) + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("Message adaptor($type) is not supported") } } } - private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageProducerProperties::class.java - ) - } - /** Consumer Property Lib Service Implementation **/ /** Return Message Consumer Service for [jsonNode] definitions. */ @@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(prefix: String): MessageConsumerProperties { val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageConsumerProperties::class.java + ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageConsumerProperties::class.java + ) + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - kafkaStreamsBasicAuthMessageConsumerProperties(prefix) + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsSslAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java + ) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { val type = jsonNode.get("type").textValue() return when (type) { + /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): BlueprintMessageConsumerService { - when (messageConsumerProperties) { - is KafkaBasicAuthMessageConsumerProperties -> { - return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) + when (messageConsumerProperties.type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties + ) } - is KafkaStreamsBasicAuthConsumerProperties -> { - return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties) + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties + ) } else -> { - throw BluePrintProcessorException("couldn't get Message client service for") + throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") } } } - - private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java - ) - } - - private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { - return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java - ) - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt deleted file mode 100644 index 3415c8d0d..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.nio.charset.Charset -import java.time.Duration -import kotlin.concurrent.thread - -open class KafkaBasicAuthMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties -) : - BlueprintMessageConsumerService { - - val log = logger(KafkaBasicAuthMessageConsumerService::class) - val channel = Channel() - var kafkaConsumer: Consumer? = null - - @Volatile - var keepGoing = true - - fun kafkaConsumer(additionalConfig: Map? = null): Consumer { - val configProperties = hashMapOf() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit - /** - * earliest: automatically reset the offset to the earliest offset - * latest: automatically reset the offset to the latest offset - */ - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId - - /** To handle Back pressure, Get only configured record for processing */ - if (messageConsumerProperties.pollRecords > 0) { - configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords - } - // TODO("Security Implementation based on type") - /** add or override already set properties */ - additionalConfig?.let { configProperties.putAll(it) } - /** Create Kafka consumer */ - return KafkaConsumer(configProperties) - } - - override suspend fun subscribe(additionalConfig: Map?): Channel { - /** get to topic names */ - val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } - check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } - return subscribe(consumerTopic, additionalConfig) - } - - override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { - /** Create Kafka consumer */ - kafkaConsumer = kafkaConsumer(additionalConfig) - - checkNotNull(kafkaConsumer) { - "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" - } - - kafkaConsumer!!.subscribe(topics) - log.info("Successfully consumed topic($topics)") - - thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { - keepGoing = true - kafkaConsumer!!.use { kc -> - while (keepGoing) { - val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.trace("Consumed Records : ${consumerRecords.count()}") - runBlocking { - consumerRecords?.forEach { consumerRecord -> - /** execute the command block */ - consumerRecord.value()?.let { - launch { - if (!channel.isClosedForSend) { - channel.send(String(it, Charset.defaultCharset())) - } else { - log.error("Channel is closed to receive message") - } - } - } - } - } - } - log.info("message listener shutting down.....") - } - } - return channel - } - - override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { - /** get to topic names */ - val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } - check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } - return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) - } - - override suspend fun consume( - topics: List, - additionalConfig: Map?, - consumerFunction: ConsumerFunction - ) { - - val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction - - /** Create Kafka consumer */ - kafkaConsumer = kafkaConsumer(additionalConfig) - - checkNotNull(kafkaConsumer) { - "failed to create kafka consumer for " + - "server(${messageConsumerProperties.bootstrapServers})'s " + - "topics(${messageConsumerProperties.bootstrapServers})" - } - - kafkaConsumer!!.subscribe(topics) - log.info("Successfully consumed topic($topics)") - - thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { - keepGoing = true - kafkaConsumer!!.use { kc -> - while (keepGoing) { - val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.trace("Consumed Records : ${consumerRecords.count()}") - runBlocking { - /** Execute dynamic consumer Block substitution */ - kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) - } - } - log.info("message listener shutting down.....") - } - } - } - - override suspend fun shutDown() { - /** stop the polling loop */ - keepGoing = false - /** Close the Channel */ - channel.cancel() - /** TO shutdown gracefully, need to wait for the maximum poll time */ - delay(messageConsumerProperties.pollMillSec) - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt deleted file mode 100644 index 8416282af..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import org.apache.commons.lang.builder.ToStringBuilder -import org.apache.kafka.clients.producer.Callback -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID -import org.slf4j.LoggerFactory -import java.nio.charset.Charset - -class KafkaBasicAuthMessageProducerService( - private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties -) : - BlueprintMessageProducerService { - - private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - - private var kafkaProducer: KafkaProducer? = null - - private val messageLoggerService = MessageLoggerService() - - override suspend fun sendMessageNB(message: Any): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message) - } - - override suspend fun sendMessageNB(message: Any, headers: MutableMap?): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message, headers) - } - - override suspend fun sendMessageNB( - topic: String, - message: Any, - headers: MutableMap? - ): Boolean { - val byteArrayMessage = when (message) { - is String -> message.toByteArray(Charset.defaultCharset()) - else -> message.asJsonString().toByteArray(Charset.defaultCharset()) - } - - val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) - val recordHeaders = record.headers() - messageLoggerService.messageProducing(recordHeaders) - headers?.let { - headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } - } - val callback = Callback { metadata, exception -> - log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") - } - messageTemplate().send(record, callback) - return true - } - - fun messageTemplate(additionalConfig: Map? = null): KafkaProducer { - log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") - val configProps = hashMapOf() - configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers - configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java - configProps[ACKS_CONFIG] = messageProducerProperties.acks - configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence - if (messageProducerProperties.clientId != null) { - configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! - } - // TODO("Security Implementation based on type") - - // Add additional Properties - if (additionalConfig != null) { - configProps.putAll(additionalConfig) - } - - if (kafkaProducer == null) { - kafkaProducer = KafkaProducer(configProps) - } - return kafkaProducer!! - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt new file mode 100644 index 000000000..cdcd4197c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -0,0 +1,150 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset +import java.time.Duration +import kotlin.concurrent.thread + +open class KafkaMessageConsumerService( + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties +) : + BlueprintMessageConsumerService { + + val log = logger(KafkaMessageConsumerService::class) + val channel = Channel() + var kafkaConsumer: Consumer? = null + + @Volatile + var keepGoing = true + + fun kafkaConsumer(additionalConfig: Map? = null): Consumer { + val configProperties = messageConsumerProperties.getConfig() + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return KafkaConsumer(configProperties) + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return subscribe(consumerTopic, additionalConfig) + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + consumerRecords?.forEach { consumerRecord -> + /** execute the command block */ + consumerRecord.value()?.let { + launch { + if (!channel.isClosedForSend) { + channel.send(String(it, Charset.defaultCharset())) + } else { + log.error("Channel is closed to receive message") + } + } + } + } + } + } + log.info("message listener shutting down.....") + } + } + return channel + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume( + topics: List, + additionalConfig: Map?, + consumerFunction: ConsumerFunction + ) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + + override suspend fun shutDown() { + /** stop the polling loop */ + keepGoing = false + /** Close the Channel */ + channel.cancel() + /** TO shutdown gracefully, need to wait for the maximum poll time */ + delay(messageConsumerProperties.pollMillSec) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt new file mode 100644 index 000000000..931f052ed --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.slf4j.LoggerFactory +import java.nio.charset.Charset + +class KafkaMessageProducerService( + private val messageProducerProperties: MessageProducerProperties +) : + BlueprintMessageProducerService { + + private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!! + + private var kafkaProducer: KafkaProducer? = null + + private val messageLoggerService = MessageLoggerService() + + override suspend fun sendMessageNB(message: Any): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message) + } + + override suspend fun sendMessageNB(message: Any, headers: MutableMap?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } + + override suspend fun sendMessageNB( + topic: String, + message: Any, + headers: MutableMap? + ): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } + + val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) + headers?.let { + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") + } + messageTemplate().send(record, callback) + return true + } + + fun messageTemplate(additionalConfig: Map? = null): KafkaProducer { + log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + val configProps = messageProducerProperties.getConfig() + + /** Add additional Properties */ + if (additionalConfig != null) + configProps.putAll(additionalConfig) + + if (kafkaProducer == null) + kafkaProducer = KafkaProducer(configProps) + + return kafkaProducer!! + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt deleted file mode 100644 index 0b353d58b..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.channels.Channel -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.util.Properties - -open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) : - BlueprintMessageConsumerService { - - val log = logger(KafkaStreamsBasicAuthConsumerService::class) - lateinit var kafkaStreams: KafkaStreams - - private fun streamsConfig(additionalConfig: Map? = null): Properties { - val configProperties = Properties() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId - configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset - configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee - // TODO("Security Implementation based on type") - /** add or override already set properties */ - additionalConfig?.let { configProperties.putAll(it) } - /** Create Kafka consumer */ - return configProperties - } - - override suspend fun subscribe(additionalConfig: Map?): Channel { - throw BluePrintProcessorException("not implemented") - } - - override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { - throw BluePrintProcessorException("not implemented") - } - - override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { - val streamsConfig = streamsConfig(additionalConfig) - val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction - val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) - log.info("Kafka streams topology : ${topology.describe()}") - kafkaStreams = KafkaStreams(topology, streamsConfig) - kafkaStreams.cleanUp() - kafkaStreams.start() - kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } - } - - override suspend fun shutDown() { - if (kafkaStreams != null) { - kafkaStreams.close() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt new file mode 100644 index 000000000..60f2dfa05 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -0,0 +1,66 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import org.apache.kafka.streams.KafkaStreams +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.Properties + +open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) : + BlueprintMessageConsumerService { + + val log = logger(KafkaStreamsConsumerService::class) + lateinit var kafkaStreams: KafkaStreams + + private fun streamsConfig(additionalConfig: Map? = null): Properties { + val configProperties = Properties() + /** set consumer properties */ + messageConsumerProperties.getConfig().let { configProperties.putAll(it) } + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return configProperties + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + val streamsConfig = streamsConfig(additionalConfig) + val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") + kafkaStreams = KafkaStreams(topology, streamsConfig) + kafkaStreams.cleanUp() + kafkaStreams.start() + kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } + } + + override suspend fun shutDown() { + if (kafkaStreams != null) { + kafkaStreams.close() + } + } +} -- cgit 1.2.3-korg