From 3cba071c0d52c446db67f3a73db2d31e594d1653 Mon Sep 17 00:00:00 2001 From: "david.mcweeney" Date: Thu, 17 Feb 2022 17:07:59 +0000 Subject: DMAAP-1706 - New Kafka Auth option Change-Id: I8533721d23d6adb41f65cb96fb2b8f852bda47b8 Signed-off-by: david.mcweeney Issue-ID: DMAAP-1706 --- .../message/BluePrintMessageLibConfiguration.kt | 2 + .../message/BluePrintMessageLibData.kt | 37 +++++++++++++ .../service/BluePrintMessageLibPropertyService.kt | 28 ++++++++++ .../service/BlueprintMessageConsumerServiceTest.kt | 62 +++++++++++++++++++++ .../service/BlueprintMessageProducerServiceTest.kt | 64 +++++++++++++++++++++- 5 files changed, 192 insertions(+), 1 deletion(-) (limited to 'ms/blueprintsprocessor') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 659295a6b..249af8812 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -1,6 +1,7 @@ /* * Copyright © 2019 IBM. * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modification Copyright (C) 2022 Nordix Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,5 +68,6 @@ class MessageLibConstants { const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" + const val TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH = "kafka-scram-plain-text-auth" } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 3e7db9597..886c87c0b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,6 +1,7 @@ /* * Copyright © 2019 IBM. * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modification Copyright (C) 2022 Nordix Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,6 +118,24 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr } } +/** (SASL) SCRAM Plaintext Auth */ +class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + /** Consumer */ abstract class MessageConsumerProperties : CommonProperties() /** Kafka Streams */ @@ -265,3 +284,21 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr return configProps } } + +/** (SASL) SCRAM Plaintext Auth */ +class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index c0cf51b93..d10f2d43b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -1,6 +1,7 @@ /* * Copyright © 2019 IBM. * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property + * Modification Copyright (C) 2022 Nordix Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +23,8 @@ import io.micrometer.core.instrument.MeterRegistry import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlainTextAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlaintextAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties @@ -71,6 +74,11 @@ open class BluePrintMessageLibPropertyService( prefix, KafkaScramSslAuthMessageProducerProperties::class.java ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramPlainTextAuthMessageProducerProperties::class.java + ) + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -89,6 +97,9 @@ open class BluePrintMessageLibPropertyService( MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramPlainTextAuthMessageProducerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -130,6 +141,12 @@ open class BluePrintMessageLibPropertyService( prefix, KafkaScramSslAuthMessageConsumerProperties::class.java ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramPlaintextAuthMessageConsumerProperties::class.java + ) + } + /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { bluePrintPropertiesService.propertyBeanType( @@ -146,6 +163,7 @@ open class BluePrintMessageLibPropertyService( prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java ) } + else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -165,6 +183,9 @@ open class BluePrintMessageLibPropertyService( MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramPlaintextAuthMessageConsumerProperties::class.java)!! + } /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! @@ -175,6 +196,7 @@ open class BluePrintMessageLibPropertyService( MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! } + else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -204,6 +226,12 @@ open class BluePrintMessageLibPropertyService( meterRegistry ) } + MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaScramPlaintextAuthMessageConsumerProperties, + meterRegistry + ) + } /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { return KafkaStreamsConsumerService( diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 9c37b9d13..37d8b24fb 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,6 +1,7 @@ /* * Copyright © 2019 IBM. * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. + * Modification Copyright (C) 2022 Nordix Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +84,16 @@ import kotlin.test.assertTrue "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth", + "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample2.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample2.pollRecords=1", + "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user", + "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", @@ -271,6 +282,57 @@ open class BlueprintMessageConsumerServiceTest { } } + + @Test + fun testKafkaScramPlaintextAuthConfig() { + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ConsumerConfig.GROUP_ID_CONFIG to "sample-group", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString() + ) + + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2") + + val configProps = messageConsumerProperties.getConfig() + + assertEquals( + messageConsumerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals( + messageConsumerProperties.type, + "kafka-scram-plain-text-auth", + "Authentication type doesn't match the expected value" + ) + + assertTrue( + configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG), + "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}" + ) + assertTrue( + configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"), + "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id" + ) + + expectedConfig.forEach { + assertTrue( + configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals( + configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test fun testKafkaIntegration() { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 87819f677..bb35b6614 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -1,6 +1,7 @@ /* * Copyright © 2019 IBM. * Modifications Copyright © 2021 Bell Canada. + * Modification Copyright (C) 2022 Nordix Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.annotation.DirtiesContext @@ -69,9 +71,17 @@ import kotlin.test.assertTrue "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", - "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", + + "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth", + "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample2.topic=default-topic", + "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword" ] ) + open class BlueprintMessageProducerServiceTest { @Autowired @@ -163,4 +173,56 @@ open class BlueprintMessageProducerServiceTest { ) } } + + @Test + fun testKafkaScramPlaintextAuthConfig() { + + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, + ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString() + ) + + val messageProducerProperties = bluePrintMessageLibPropertyService + .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2") + + val configProps = messageProducerProperties.getConfig() + + assertEquals( + messageProducerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals( + messageProducerProperties.type, + "kafka-scram-plain-text-auth", + "Authentication type doesn't match the expected value" + ) + + assertTrue( + configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG), + "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}" + ) + assertTrue( + configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"), + "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id" + ) + + expectedConfig.forEach { + assertTrue( + configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals( + configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } } -- cgit 1.2.3-korg