diff options
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka')
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt | 73 |
1 files changed, 49 insertions, 24 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt index fec17856..37d67838 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt @@ -3,6 +3,7 @@ * dcaegen2-collectors-veshv * ================================================================================ * Copyright (C) 2019 NOKIA + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,29 +20,24 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import io.kotest.extensions.system.OverrideMode import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.KafkaException import org.apache.kafka.common.config.SaslConfigs import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.TestContainer -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass import reactor.kafka.sender.SenderOptions -import java.io.IOException -import java.io.StreamTokenizer -import java.io.StringReader -import java.util.* -import javax.security.auth.login.AppConfigurationEntry -import javax.security.auth.login.Configuration +import io.kotest.extensions.system.withEnvironment +import io.kotest.matchers.shouldBe +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.on +import org.jetbrains.spek.api.dsl.TestContainer +import org.jetbrains.spek.api.dsl.it /** * @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com) @@ -55,7 +51,6 @@ internal class KafkaSenderOptionsFactoryTest : Spek({ .bootstrapServers("dmaap1,dmaap2") .topicName("PERF_DATA") .build() - on("calling the CUT method") { val result = KafkaSenderOptionsFactory.createSenderOptions(sink) val itShouldHavePropertySet = propertyChecker(result) @@ -68,22 +63,54 @@ internal class KafkaSenderOptionsFactoryTest : Spek({ itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1) itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1") - itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null) itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null) itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null) } } - given("authenticated KafkaSink") { + given("authenticated AAF KafkaSink") { val aafCredentials = ImmutableAafCredentials.builder() - .username("user \" with quote") - .password("password \" with quote") - .build() + .username("user \" with quote") + .password("password \" with quote") + .build() + + val sink = ImmutableKafkaSink.builder() + .bootstrapServers("dmaap-service") + .topicName("OTHER_TOPIC") + .aafCredentials(aafCredentials) + .build() + + on("calling the CUT method") { + val result = KafkaSenderOptionsFactory.createSenderOptions(sink) + val itShouldHavePropertySet = propertyChecker(result) + + itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers()) + itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291) + itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432) + itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1) + itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1") + itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") + itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN") + itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + """username="user \" with quote" password="password \" with quote";""") + } + + } + given("authenticated SCRAM KafkaSink") { + withEnvironment("USE_SCRAM", "true", OverrideMode.SetOrOverride) { + System.getenv("USE_SCRAM") shouldBe "true" + } + withEnvironment("JAAS_CONFIG", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";", OverrideMode.SetOrOverride) { + System.getenv("JAAS_CONFIG") shouldBe "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";" + } val sink = ImmutableKafkaSink.builder() .bootstrapServers("dmaap-service") .topicName("OTHER_TOPIC") - .aafCredentials(aafCredentials) .build() on("calling the CUT method") { @@ -98,12 +125,10 @@ internal class KafkaSenderOptionsFactoryTest : Spek({ itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1) itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1") - itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") - itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN") + itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512") itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, - "org.apache.kafka.common.security.plain.PlainLoginModule required " + - """username="user \" with quote" password="password \" with quote";""") + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";") } } |