From 8a2eb4ae98beb70eac4e5fa4bb2e786c6a9513d2 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 13 Apr 2020 16:42:36 -0400 Subject: Secure Kafka Authentication Implementation of kafka secure authentication : - SSL - SASL(SCRAM) & SSL Issue-ID: CCSDK-2313 Change-Id: I4b2fc7abab7478e360ebf461608a620d75708f54 Signed-off-by: Julien Fontaine --- .../message/MessagePropertiesDSLTest.kt | 60 ++++++--- .../service/BlueprintMessageConsumerServiceTest.kt | 85 +++++++++++- .../service/BlueprintMessageProducerServiceTest.kt | 72 +++++++++- .../KafkaStreamsBasicAuthConsumerServiceTest.kt | 137 ------------------- .../service/KafkaStreamsConsumerServiceTest.kt | 145 +++++++++++++++++++++ 5 files changed, 336 insertions(+), 163 deletions(-) delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index b10e1023b..612a57d23 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -27,17 +27,27 @@ import kotlin.test.assertNotNull class MessagePropertiesDSLTest { @Test - fun testMessageProducerDSL() { + fun testScramSslMessageProducerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") { - kafkaBasicAuth { + relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") acks("all") retries(3) enableIdempotence(true) topic("sample-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -50,27 +60,27 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match") assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" ) assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" ) } @Test - fun testMessageConsumerDSL() { + fun testScramSslAuthMessageConsumerDSL() { val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { topologyTemplate { - relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") { - kafkaBasicAuth { + relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") { + kafkaScramSslAuth { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") groupId("sample-group-id") @@ -79,15 +89,35 @@ class MessagePropertiesDSLTest { autoOffsetReset("latest") pollMillSec(5000) pollRecords(20) + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } - relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") { - kafkaStreamsBasicAuth { + relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") { + kafkaStreamsScramSslAuth { bootstrapServers("sample-bootstrapServers") applicationId("sample-application-id") autoOffsetReset("latest") processingGuarantee(StreamsConfig.EXACTLY_ONCE) topic("sample-streaming-topic") + truststore("/path/to/truststore.jks") + truststorePassword("secretpassword") + truststoreType("JKS") + keystore("/path/to/keystore.jks") + keystorePassword("secretpassword") + keystoreType("JKS") + sslEndpointIdentificationAlgorithm("") + saslMechanism("SCRAM-SHA-512") + scramUsername("sample-user") + scramPassword("secretpassword") } } } @@ -100,8 +130,8 @@ class MessagePropertiesDSLTest { val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates assertNotNull(relationshipTemplates, "failed to get relationship templates") assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match") - assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") - assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth") + assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth") + assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth") val relationshipTypes = serviceTemplate.relationshipTypes assertNotNull(relationshipTypes, "failed to get relationship types") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 823ba7dee..ac08dc7b7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.MockConsumer import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.StringDeserializer import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -52,18 +63,30 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", "blueprintsprocessor.messageconsumer.sample.topic=default-topic", "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageConsumerServiceTest { @@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerService() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerWithDynamicFunction() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaScramSslAuthConfig() { + + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ConsumerConfig.GROUP_ID_CONFIG to "sample-group", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample") + + val configProps = messageConsumerProperties.getConfig() + + assertEquals(messageConsumerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageConsumerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}") + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test fun testKafkaIntegration() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val channel = blueprintMessageConsumerService.subscribe(null) @@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService launch { repeat(5) { delay(100) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index b824189d2..72a47ed56 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,12 +20,22 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration @@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import java.util.concurrent.Future import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue @RunWith(SpringRunner::class) @@ -43,10 +54,16 @@ import kotlin.test.assertTrue ) @TestPropertySource( properties = - ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" ] ) open class BlueprintMessageProducerServiceTest { @@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Test - fun testKafkaBasicAuthProducertService() { + fun testKafkaScramSslAuthProducerService() { runBlocking { val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService val mockKafkaTemplate = mockk>() @@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest { assertTrue(response, "failed to get command response") } } + + @Test + fun testKafkaScramSslAuthConfig() { + val expectedConfig = mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) + + val messageProducerProperties = bluePrintMessageLibPropertyService + .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample") + + val configProps = messageProducerProperties.getConfig() + + assertEquals(messageProducerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals(messageProducerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value") + + expectedConfig.forEach { + assertTrue(configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals(configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + ) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt deleted file mode 100644 index 1657d70b4..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.apache.kafka.streams.processor.Processor -import org.apache.kafka.streams.processor.ProcessorSupplier -import org.apache.kafka.streams.state.Stores -import org.junit.Test -import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.SpringRunner -import kotlin.test.assertNotNull - -@RunWith(SpringRunner::class) -@DirtiesContext -@ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] -) -@TestPropertySource( - properties = - [ - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" - - ] -) -class KafkaStreamsBasicAuthConsumerServiceTest { - - @Autowired - lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService - - @Test - fun testProperties() { - val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") - assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") - } - - /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - // @Test - fun testKafkaStreamingMessageConsumer() { - runBlocking { - val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") - - // Dynamic Consumer Function to create Topology - val consumerFunction = object : KafkaStreamConsumerFunction { - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") - topology.addSource("Source", *topics.toTypedArray()) - // Processor Supplier - val firstProcessorSupplier = object : ProcessorSupplier { - override fun get(): Processor { - return FirstProcessor() - } - } - val changelogConfig: MutableMap = hashMapOf() - changelogConfig.put("min.insync.replicas", "1") - - // Store Buolder - val countStoreSupplier = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("PriorityMessageState"), - Serdes.String(), - PriorityMessageSerde() - ) - .withLoggingEnabled(changelogConfig) - - topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") - topology.addStateStore(countStoreSupplier, "FirstProcessor") - topology.addSink( - "SINK", "default-stream-topic-out", Serdes.String().serializer(), - PriorityMessageSerde().serializer(), "FirstProcessor" - ) - return topology - } - } - - /** Send message with every 1 sec */ - val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - launch { - repeat(5) { - delay(1000) - val headers: MutableMap = hashMapOf() - headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB( - message = "this is my message($it)", - headers = headers - ) - } - } - streamingConsumerService.consume(null, consumerFunction) - delay(10000) - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt new file mode 100644 index 000000000..c30ab9b02 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -0,0 +1,145 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.apache.kafka.streams.state.Stores +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", + + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword" + + ] +) +class KafkaStreamsConsumerServiceTest { + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testProperties() { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + // @Test + fun testKafkaStreamingMessageConsumer() { + runBlocking { + val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + + // Dynamic Consumer Function to create Topology + val consumerFunction = object : KafkaStreamConsumerFunction { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + topology.addSource("Source", *topics.toTypedArray()) + // Processor Supplier + val firstProcessorSupplier = object : ProcessorSupplier { + override fun get(): Processor { + return FirstProcessor() + } + } + val changelogConfig: MutableMap = hashMapOf() + changelogConfig.put("min.insync.replicas", "1") + + // Store Buolder + val countStoreSupplier = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde() + ) + .withLoggingEnabled(changelogConfig) + + topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") + topology.addStateStore(countStoreSupplier, "FirstProcessor") + topology.addSink( + "SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor" + ) + return topology + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaMessageProducerService + launch { + repeat(5) { + delay(1000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) + } + } + streamingConsumerService.consume(null, consumerFunction) + delay(10000) + streamingConsumerService.shutDown() + } + } +} -- cgit 1.2.3-korg