summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/test
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-04-13 16:42:36 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-04-21 13:57:31 -0400
commit8a2eb4ae98beb70eac4e5fa4bb2e786c6a9513d2 (patch)
treed953dcfa97e409332553d58759beb34ae3efa7a7 /ms/blueprintsprocessor/modules/commons/message-lib/src/test
parent6e7cbbbc4668c9d37d44bab6625ab7275043eb72 (diff)
Secure Kafka Authentication
Implementation of kafka secure authentication : - SSL - SASL(SCRAM) & SSL Issue-ID: CCSDK-2313 Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt60
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt85
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt72
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt)18
4 files changed, 204 insertions, 31 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index b10e1023b..612a57d23 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -27,17 +27,27 @@ import kotlin.test.assertNotNull
class MessagePropertiesDSLTest {
@Test
- fun testMessageProducerDSL() {
+ fun testScramSslMessageProducerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
acks("all")
retries(3)
enableIdempotence(true)
topic("sample-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -50,27 +60,27 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match")
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
)
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
)
}
@Test
- fun testMessageConsumerDSL() {
+ fun testScramSslAuthMessageConsumerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
groupId("sample-group-id")
@@ -79,15 +89,35 @@ class MessagePropertiesDSLTest {
autoOffsetReset("latest")
pollMillSec(5000)
pollRecords(20)
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
- relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") {
- kafkaStreamsBasicAuth {
+ relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") {
+ kafkaStreamsScramSslAuth {
bootstrapServers("sample-bootstrapServers")
applicationId("sample-application-id")
autoOffsetReset("latest")
processingGuarantee(StreamsConfig.EXACTLY_ONCE)
topic("sample-streaming-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -100,8 +130,8 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
- assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
+ assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 823ba7dee..ac08dc7b7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.MockConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -52,18 +63,30 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
"blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageConsumerServiceTest {
@@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerService() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerWithDynamicFunction() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
+
+ val configProps = messageConsumerProperties.getConfig()
+
+ assertEquals(messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageConsumerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}")
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testKafkaIntegration() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val channel = blueprintMessageConsumerService.subscribe(null)
@@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(100)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index b824189d2..72a47ed56 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -20,12 +20,22 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
@@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import java.util.concurrent.Future
import kotlin.test.Test
+import kotlin.test.assertEquals
import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@@ -43,10 +54,16 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageProducerServiceTest {
@@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Test
- fun testKafkaBasicAuthProducertService() {
+ fun testKafkaScramSslAuthProducerService() {
runBlocking {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
@@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest {
assertTrue(response, "failed to get command response")
}
}
+
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageProducerProperties = bluePrintMessageLibPropertyService
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
+
+ val configProps = messageProducerProperties.getConfig()
+
+ assertEquals(messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageProducerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 1657d70b4..c30ab9b02 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -47,19 +47,27 @@ import kotlin.test.assertNotNull
@TestPropertySource(
properties =
[
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
"blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
]
)
-class KafkaStreamsBasicAuthConsumerServiceTest {
+class KafkaStreamsConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -117,7 +125,7 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(1000)