aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt73
1 files changed, 49 insertions, 24 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
index fec17856..37d67838 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
@@ -3,6 +3,7 @@
* dcaegen2-collectors-veshv
* ================================================================================
* Copyright (C) 2019 NOKIA
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,29 +20,24 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import io.kotest.extensions.system.OverrideMode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.config.SaslConfigs
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.TestContainer
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass
import reactor.kafka.sender.SenderOptions
-import java.io.IOException
-import java.io.StreamTokenizer
-import java.io.StringReader
-import java.util.*
-import javax.security.auth.login.AppConfigurationEntry
-import javax.security.auth.login.Configuration
+import io.kotest.extensions.system.withEnvironment
+import io.kotest.matchers.shouldBe
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.on
+import org.jetbrains.spek.api.dsl.TestContainer
+import org.jetbrains.spek.api.dsl.it
/**
* @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com)
@@ -55,7 +51,6 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
.bootstrapServers("dmaap1,dmaap2")
.topicName("PERF_DATA")
.build()
-
on("calling the CUT method") {
val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
val itShouldHavePropertySet = propertyChecker(result)
@@ -68,22 +63,54 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
-
itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null)
itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null)
itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null)
}
}
- given("authenticated KafkaSink") {
+ given("authenticated AAF KafkaSink") {
val aafCredentials = ImmutableAafCredentials.builder()
- .username("user \" with quote")
- .password("password \" with quote")
- .build()
+ .username("user \" with quote")
+ .password("password \" with quote")
+ .build()
+
+ val sink = ImmutableKafkaSink.builder()
+ .bootstrapServers("dmaap-service")
+ .topicName("OTHER_TOPIC")
+ .aafCredentials(aafCredentials)
+ .build()
+
+ on("calling the CUT method") {
+ val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
+ val itShouldHavePropertySet = propertyChecker(result)
+
+ itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
+ itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
+ itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
+ itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
+ itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
+ itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
+ itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
+ itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ """username="user \" with quote" password="password \" with quote";""")
+ }
+
+ }
+ given("authenticated SCRAM KafkaSink") {
+ withEnvironment("USE_SCRAM", "true", OverrideMode.SetOrOverride) {
+ System.getenv("USE_SCRAM") shouldBe "true"
+ }
+ withEnvironment("JAAS_CONFIG", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";", OverrideMode.SetOrOverride) {
+ System.getenv("JAAS_CONFIG") shouldBe "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";"
+ }
val sink = ImmutableKafkaSink.builder()
.bootstrapServers("dmaap-service")
.topicName("OTHER_TOPIC")
- .aafCredentials(aafCredentials)
.build()
on("calling the CUT method") {
@@ -98,12 +125,10 @@ internal class KafkaSenderOptionsFactoryTest : Spek({
itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
-
itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
- itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
+ itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
- "org.apache.kafka.common.security.plain.PlainLoginModule required " +
- """username="user \" with quote" password="password \" with quote";""")
+ "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"onap-dcae-hv-ves-kafka-user\" password=\"oJumEmQAH6kN\";")
}
}