aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
authordavid.mcweeney <david.mcweeney@est.tech>2022-02-17 17:07:59 +0000
committerdavid.mcweeney <david.mcweeney@est.tech>2022-03-02 16:56:59 +0000
commit3cba071c0d52c446db67f3a73db2d31e594d1653 (patch)
treefae01c1f73544670dc1e72538cee617975133c81 /ms/blueprintsprocessor/modules/commons/message-lib
parent4db0643d321940dc0677f6953cb5701fb8ffb26b (diff)
DMAAP-1706 - New Kafka Auth option
Change-Id: I8533721d23d6adb41f65cb96fb2b8f852bda47b8 Signed-off-by: david.mcweeney <david.mcweeney@est.tech> Issue-ID: DMAAP-1706
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt37
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt28
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt62
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt64
5 files changed, 192 insertions, 1 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 659295a6b..249af8812 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -67,5 +68,6 @@ class MessageLibConstants {
const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
+ const val TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH = "kafka-scram-plain-text-auth"
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 3e7db9597..886c87c0b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -117,6 +118,24 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr
}
}
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlainTextAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
/** Consumer */
abstract class MessageConsumerProperties : CommonProperties()
/** Kafka Streams */
@@ -265,3 +284,21 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr
return configProps
}
}
+
+/** (SASL) SCRAM Plaintext Auth */
+class KafkaScramPlaintextAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_PLAINTEXT.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index c0cf51b93..d10f2d43b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +23,8 @@ import io.micrometer.core.instrument.MeterRegistry
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlainTextAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramPlaintextAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
@@ -71,6 +74,11 @@ open class BluePrintMessageLibPropertyService(
prefix, KafkaScramSslAuthMessageProducerProperties::class.java
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramPlainTextAuthMessageProducerProperties::class.java
+ )
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -89,6 +97,9 @@ open class BluePrintMessageLibPropertyService(
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramPlainTextAuthMessageProducerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -130,6 +141,12 @@ open class BluePrintMessageLibPropertyService(
prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramPlaintextAuthMessageConsumerProperties::class.java
+ )
+ }
+
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
@@ -146,6 +163,7 @@ open class BluePrintMessageLibPropertyService(
prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
)
}
+
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -165,6 +183,9 @@ open class BluePrintMessageLibPropertyService(
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramPlaintextAuthMessageConsumerProperties::class.java)!!
+ }
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
@@ -175,6 +196,7 @@ open class BluePrintMessageLibPropertyService(
MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
}
+
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -204,6 +226,12 @@ open class BluePrintMessageLibPropertyService(
meterRegistry
)
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_PLAIN_TEXT_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaScramPlaintextAuthMessageConsumerProperties,
+ meterRegistry
+ )
+ }
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
return KafkaStreamsConsumerService(
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 9c37b9d13..37d8b24fb 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -83,6 +84,16 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
"blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample2.type=kafka-scram-plain-text-auth",
+ "blueprintsprocessor.messageconsumer.sample2.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample2.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample2.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample2.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample2.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample2.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample2.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample2.scramPassword=secretpassword",
+
"blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
@@ -271,6 +282,57 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+
+ @Test
+ fun testKafkaScramPlaintextAuthConfig() {
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+ )
+
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample2")
+
+ val configProps = messageConsumerProperties.getConfig()
+
+ assertEquals(
+ messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageConsumerProperties.type,
+ "kafka-scram-plain-text-auth",
+ "Authentication type doesn't match the expected value"
+ )
+
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
+ expectedConfig.forEach {
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testKafkaIntegration() {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 87819f677..bb35b6614 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2019 IBM.
* Modifications Copyright © 2021 Bell Canada.
+ * Modification Copyright (C) 2022 Nordix Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +39,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.annotation.DirtiesContext
@@ -69,9 +71,17 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
"blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
"blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
+
+ "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth",
+ "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample2.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword"
]
)
+
open class BlueprintMessageProducerServiceTest {
@Autowired
@@ -163,4 +173,56 @@ open class BlueprintMessageProducerServiceTest {
)
}
}
+
+ @Test
+ fun testKafkaScramPlaintextAuthConfig() {
+
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+ ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
+ )
+
+ val messageProducerProperties = bluePrintMessageLibPropertyService
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2")
+
+ val configProps = messageProducerProperties.getConfig()
+
+ assertEquals(
+ messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageProducerProperties.type,
+ "kafka-scram-plain-text-auth",
+ "Authentication type doesn't match the expected value"
+ )
+
+ assertTrue(
+ configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+ "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+ )
+ assertTrue(
+ configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+ "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+ )
+
+ expectedConfig.forEach {
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
}