aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt34
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt19
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt13
9 files changed, 80 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 84310802..782d2324 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,13 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.utils.Closeable
import reactor.core.publisher.Flux
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
}
interface ConfigurationProvider {
- operator fun invoke(): Flux<CollectorConfiguration>
+ operator fun invoke(): Flux<Routing>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3ea14385..c08df748 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -20,13 +20,12 @@
package org.onap.dcae.collectors.veshv.factory
import arrow.core.Option
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val maximumPayloadSizeBytes: Int,
+ private val maxPayloadSizeBytes: Int,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config: AtomicReference<CollectorConfiguration> = AtomicReference()
+ val config = AtomicReference<Routing>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
+ router = Router(routing, ctx),
sink = sinkProvider(ctx),
metrics = metrics)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index 58a8599a..6c4e4671 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
/**
@@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
* @since May 2018
*/
object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+
+ private val sslFactory = SslContextFactory()
+
+ fun createNettyTcpServer(serverConfig: ServerConfiguration,
+ securityConfig: SecurityConfiguration,
collectorProvider: CollectorProvider,
- metrics: Metrics): Server =
- NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics)
+ metrics: Metrics
+ ): Server = NettyTcpServer(
+ serverConfig,
+ sslFactory.createServerContext(securityConfig),
+ collectorProvider,
+ metrics
+ )
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index a853839a..c362020e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(dummyMode: Boolean,
- kafkaConfig: KafkaConfiguration): SinkProvider =
- if (dummyMode)
+ fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
+ if (config.dummyMode)
LoggingSinkProvider()
else
- KafkaSinkProvider(kafkaConfig)
+ KafkaSinkProvider(config)
- fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- configurationProviderParams)
+ config)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 51b6d4f0..754a2efc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
@@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
retrySpec: Retry<Any>
) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
cbsClientMono,
params.firstRequestDelay,
params.requestInterval,
@@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<CollectorConfiguration> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
.updates(RequestDiagnosticContext.create(),
firstRequestDelay,
requestInterval)
@@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ private fun createCollectorConfiguration(configuration: JsonObject): Routing =
try {
val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
} catch (e: NullPointerException) {
throw ParsingException("Failed to parse configuration", e)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index f52890b0..96e45a02 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -46,7 +46,7 @@ import java.lang.Integer.max
internal class KafkaSinkProvider internal constructor(
private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
- constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+ constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
@@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor(
private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
- private fun constructKafkaSender(config: KafkaConfiguration) =
+
+ private fun constructKafkaSender(config: CollectorConfiguration) =
KafkaSender.create(constructSenderOptions(config))
- private fun constructSenderOptions(config: KafkaConfiguration) =
+ private fun constructSenderOptions(config: CollectorConfiguration) =
SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
- .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config))
- .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config))
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
+ .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
+ .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
.producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
.producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
.producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
@@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor(
.producerProperty(ACKS_CONFIG, "1")
.stopOnError(false)
- private fun maxRequestSize(config: KafkaConfiguration) =
- (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt()
+ private fun maxRequestSize(maxRequestSizeBytes: Int) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
- private fun bufferMemory(config: KafkaConfiguration) =
- max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes)
+ private fun bufferMemory(maxRequestSizeBytes: Int) =
+ max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 123956ad..fab96560 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.Option
import arrow.core.getOrElse
import arrow.effects.IO
+import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -41,6 +42,7 @@ import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.net.InetAddress
+import java.net.InetSocketAddress
import java.time.Duration
@@ -48,14 +50,14 @@ import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
- private val sslContextFactory: SslContextFactory,
+internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
+ private val sslContext: Option<SslContext>,
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
override fun start(): IO<ServerHandle> = IO {
TcpServer.create()
- .addressSupplier { serverConfig.serverListenAddress }
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
.configureSsl()
.handle(this::handleConnection)
.doOnUnbound {
@@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
}
private fun TcpServer.configureSsl() =
- sslContextFactory
- .createServerContext(serverConfig.securityConfiguration)
- .map { sslContext ->
+ sslContext
+ .map { serverContext ->
logger.info { "Collector configured with SSL enabled" }
- this.secure { b -> b.sslContext(sslContext) }
+ this.secure { it.sslContext(serverContext) }
}.getOrElse {
logger.info { "Collector configured with SSL disabled" }
this
@@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
withConnectionFrom(nettyInbound) { connection ->
connection
- .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
.logConnectionClosed(clientContext)
}.run {
collector.handleConnection(nettyInbound.createDataStream())
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index 21aaa129..f830f2c9 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val route1 = it.routing.routes[0]
+ val route1 = it.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
.isEqualTo(route1.domain)
@@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({
.describedAs("target topic 1")
.isEqualTo(route1.targetTopic)
- val route2 = it.routing.routes[1]
+ val route2 = it.routes[1]
assertThat(HEARTBEAT.domainName)
.describedAs("routed domain 2")
.isEqualTo(route2.domain)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index 068476ad..1e3f2e7a 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.ves.VesEventOuterClass
import reactor.kafka.sender.KafkaSender
@@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender
internal object KafkaSinkProviderTest : Spek({
describe("non functional requirements") {
given("sample configuration") {
- val config = KafkaConfiguration("localhost:9090",
- 1024 * 1024)
+ val config = CollectorConfiguration(
+ dummyMode = false,
+ maxRequestSizeBytes = 1024 * 1024,
+ kafkaServers = "localhost:9090",
+ routing = routing { }.build())
+
val cut = KafkaSinkProvider(config)
on("sample clients") {