aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-23 11:11:45 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-24 08:50:12 +0200
commit41079f4321ce0d96866078201b02bf4290dfa13f (patch)
tree45b36e6b9dadd7bfe3b6017c110a6289bd4afaca /sources/hv-collector-core/src/test
parent4988554ea65db50dbbb50c8c80171f7910548571 (diff)
Use AAF credentials from stream definition
Change-Id: I4fc20c116c60f6e7d46215a32c33884cd957e93b Issue-ID: DCAEGEN2-1448 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt119
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt2
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt2
3 files changed, 121 insertions, 2 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
new file mode 100644
index 00000000..fec17856
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactoryTest.kt
@@ -0,0 +1,119 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.config.SaslConfigs
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.TestContainer
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.SenderOptions
+import java.io.IOException
+import java.io.StreamTokenizer
+import java.io.StringReader
+import java.util.*
+import javax.security.auth.login.AppConfigurationEntry
+import javax.security.auth.login.Configuration
+
+/**
+ * @author [Piotr Jaszczyk](mailto:piotr.jaszczyk@nokia.com)
+ * @since April 2019
+ */
+internal class KafkaSenderOptionsFactoryTest : Spek({
+ describe("creation of Kafka Sender options") {
+
+ given("unauthenticated KafkaSink") {
+ val sink = ImmutableKafkaSink.builder()
+ .bootstrapServers("dmaap1,dmaap2")
+ .topicName("PERF_DATA")
+ .build()
+
+ on("calling the CUT method") {
+ val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
+ val itShouldHavePropertySet = propertyChecker(result)
+
+ itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
+ itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
+ itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
+ itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
+ itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
+
+ itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, null)
+ itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, null)
+ itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG, null)
+ }
+
+ }
+ given("authenticated KafkaSink") {
+ val aafCredentials = ImmutableAafCredentials.builder()
+ .username("user \" with quote")
+ .password("password \" with quote")
+ .build()
+ val sink = ImmutableKafkaSink.builder()
+ .bootstrapServers("dmaap-service")
+ .topicName("OTHER_TOPIC")
+ .aafCredentials(aafCredentials)
+ .build()
+
+ on("calling the CUT method") {
+ val result = KafkaSenderOptionsFactory.createSenderOptions(sink)
+ val itShouldHavePropertySet = propertyChecker(result)
+
+ itShouldHavePropertySet(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sink.bootstrapServers())
+ itShouldHavePropertySet(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1_258_291)
+ itShouldHavePropertySet(ProducerConfig.BUFFER_MEMORY_CONFIG, 33_554_432)
+ itShouldHavePropertySet(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ itShouldHavePropertySet(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ itShouldHavePropertySet(ProducerConfig.RETRIES_CONFIG, 1)
+ itShouldHavePropertySet(ProducerConfig.ACKS_CONFIG, "1")
+
+ itShouldHavePropertySet(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
+ itShouldHavePropertySet(SaslConfigs.SASL_MECHANISM, "PLAIN")
+ itShouldHavePropertySet(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ """username="user \" with quote" password="password \" with quote";""")
+ }
+
+ }
+
+ }
+})
+
+private fun TestContainer.propertyChecker(actual: SenderOptions<VesEventOuterClass.CommonEventHeader, VesMessage>) =
+ { property: String, expectedValue: Any? ->
+ it("should have '$property' property set to '$expectedValue'") {
+ assertThat(actual.producerProperty(property)).isEqualTo(expectedValue)
+ }
+ }
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
index 63caaf0a..c799a23c 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
@@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
-class ProtobufSerializerTest : Spek({
+internal class ProtobufSerializerTest : Spek({
describe("ProtobufSerializerTest") {
val serializer = ProtobufSerializer()
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
index d11e5569..975ed827 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
@@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
-class VesMessageSerializerTest : Spek({
+internal class VesMessageSerializerTest : Spek({
describe("VesMessageSerializer") {
val serializer = VesMessageSerializer()