diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-26 14:21:02 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-28 14:16:02 +0100 |
commit | 2174a045086e16611128b20a6d4357c04d9eac4a (patch) | |
tree | 6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-core | |
parent | 1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff) |
Redefine Routing
As all needed information to route messege is contained inside of
KafkaSink message, we can simply put this object as part of single Route.
Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
12 files changed, 288 insertions, 282 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 0a1e2d43..1b92d90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux -interface Sink { +interface Sink : Closeable { + fun send(message: RoutedMessage) = send(Flux.just(message)) + fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } +interface SinkProvider : Closeable { + operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> +} + +typealias ConfigurationProvider = () -> Flux<Routing> + interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) @@ -42,11 +51,3 @@ interface Metrics { fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } - -interface SinkProvider : Closeable { - operator fun invoke(ctx: ClientContext): Sink -} - -interface ConfigurationProvider { - operator fun invoke(): Flux<Sequence<KafkaSink>> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fa4f9670..2b29acd9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Sequence<KafkaSink>>() + val config = AtomicReference<Routing>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(kafkaSinks, ctx), - sink = sinkProvider(ctx), + router = Router(routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) } } - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index a95a44d5..6e2e20f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,39 +19,75 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.None import arrow.core.toOption +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux -class Router(private val routing: Routing, private val ctx: ClientContext) { - - constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( - Routing( - kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() - ), - ctx) +class Router internal constructor(private val routing: Routing, + private val messageSinks: Map<String, Lazy<Sink>>, + private val ctx: ClientContext, + private val metrics: Metrics) { + constructor(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext, + metrics: Metrics) : + this(routing, + constructMessageSinks(routing, sinkProvider, ctx), + ctx, + metrics) { + logger.debug(ctx::mdc) { "Routing for client: $routing" } + logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + } - fun findDestination(message: VesMessage): Option<RoutedMessage> = + fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } - .map { it.routeMessage(message) } + .fold({ + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } + Flux.empty<Route>() + }, { + logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } + Flux.just(it) + }) + .flatMap { + val sinkTopic = it.sink.topicName() + messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) + } - private fun Route.routeMessage(message: VesMessage) = - RoutedMessage(targetTopic, partitioning(message.header), message) + private fun routeFor(header: CommonEventHeader) = + routing.find { it.domain == header.domain }.toOption() - private fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - routing.routes.find { it.domain == commonHeader.domain }.toOption() + private fun messageSinkFor(sinkTopic: String) = messageSinks + .getOrElse(sinkTopic) { + throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic") + } companion object { - private val logger = Logger(Routing::class) + private val logger = Logger(Router::class) + private val NONE_PARTITION = None + + internal fun constructMessageSinks(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext) = + routing.map(Route::sink) + .distinctBy { it.topicName() } + .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) } + + private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message) } + +internal class MissingMessageSinkException(msg: String) : Throwable(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 6a2792c3..433e4d57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -53,7 +49,6 @@ internal class VesHvCollector( private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, - private val sink: Sink, private val metrics: Metrics) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = @@ -62,10 +57,10 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) } + // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here + .handleErrors() + .transform(::route) + .handleErrors() .doFinally { releaseBuffersMemory() } .then() @@ -98,18 +93,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux - .flatMap(this::findRoute) - .compose(sink::send) + private fun route(flux: Flux<VesMessage>) = flux + .flatMap(router::route) .doOnNext(this::updateSinkMetrics) - private fun findRoute(msg: VesMessage) = router - .findDestination(msg) - .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } - .filterEmptyWithLog(logger, clientContext::fullMdc, - { "Found route for message: ${it.topic}, partition: ${it.partition}" }, - { "Could not find route for message" }) - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { when (consumedMessage) { is SuccessfullyConsumedMessage -> @@ -119,6 +106,11 @@ internal class VesHvCollector( } } + private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 10fe0a51..20b11753 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = - KafkaSinkProvider(config) + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 1f5df371..185693c0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<Sequence<KafkaSink>> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient + private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createCollectorConfiguration) + .map(::createRoutingDescription) .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } .retryWhen(retry) - - private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = - try { - DataStreams.namedSinks(configuration) - .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) - .map(streamParser::unsafeParse) - .asSequence() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - + private fun createRoutingDescription(configuration: JsonObject): Routing = try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 5052cc5c..2ce0f42f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext @@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, - private val ctx: ClientContext) : Sink { +internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>, + private val ctx: ClientContext) : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = - messages.map(::vesToKafkaRecord).let { records -> - sender.send(records).map { - val msg = it.correlationMetadata() - if (it.exception() == null) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - "Message sent to Kafka with metadata: ${it.recordMetadata()}" + messages.map(::vesToKafkaRecord) + .compose { sender.send(it) } + .map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) + } else { + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - SuccessfullyConsumedMessage(msg) - } else { - logger.warn(ctx::fullMdc, Marker.Invoke()) { - "Failed to send message to Kafka. Reason: ${it.exception().message}" - } - logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } - FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - } - } private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> = SenderRecord.create( - routed.topic, - routed.partition, + routed.targetTopic, + routed.partition.orNull(), FILL_TIMESTAMP_LATER, routed.message.header, routed.message, routed) - internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender - companion object { private val FILL_TIMESTAMP_LATER: Long? = null - private val logger = Logger(KafkaSink::class) + private val logger = Logger(KafkaPublisher::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 96e45a02..7a498652 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,66 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.effects.IO -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.lang.Integer.max +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider internal constructor( - private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { - - constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) - - override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) +internal class KafkaSinkProvider : SinkProvider { + private val messageSinks = synchronizedMap( + mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } override fun close() = IO { - kafkaSender.close() - logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + messageSinks.values.forEach { it.close() } + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } } companion object { private val logger = Logger(KafkaSinkProvider::class) - private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f - private const val BUFFER_MEMORY_MULTIPLIER = 32 - private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - - private fun constructKafkaSender(config: CollectorConfiguration) = - KafkaSender.create(constructSenderOptions(config)) - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) - .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) - .producerProperty(RETRIES_CONFIG, 1) - .producerProperty(ACKS_CONFIG, "1") - .stopOnError(false) - - private fun maxRequestSize(maxRequestSizeBytes: Int) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - - private fun bufferMemory(maxRequestSizeBytes: Int) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt new file mode 100644 index 00000000..40de8c51 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + + +private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f +private const val BUFFER_MEMORY_MULTIPLIER = 32 +private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 + +internal fun createKafkaSender(sinkStream: SinkStream) = + (sinkStream as KafkaSink).let { kafkaSink -> + KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers()) + .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink)) + .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink)) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) + .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .stopOnError(false) + ) + } + +private fun maxRequestSize(kafkaSink: KafkaSink) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt() + +private fun bufferMemory(kafkaSink: KafkaSink) = + Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index b8b55865..6b9c6803 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -20,22 +20,30 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None -import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.core.publisher.Flux +import reactor.test.StepVerifier /** @@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame * @since May 2018 */ object RouterTest : Spek({ - given("sample configuration") { - val config = Routing(listOf( - Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), - Route(SYSLOG.domainName, "ves_trace") - )) - val cut = Router(config, ClientContext()) - - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } + describe("Router") { - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } + whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } + val messageSinkMap = mapOf( + Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(syslogTopic, lazyOf(messageSinkMock)) + ) - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + given("sample routing specification") { + val cut = router(defaultRouting, messageSinkMap) - it("should have route available") { - assertThat(result).isNotNull() - } + on("message with existing route (rtpm)") { + whenever(messageSinkMock.send(routedPerf3GppMessage)) + .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage)) - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } + it("should be properly routed") { + val result = cut.route(perf3gppMessage) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) + assertThat(result).isNotNull() + StepVerifier.create(result) + .expectNext(successfullyConsumedPerf3gppMessage) + .verifyComplete() + + verify(perf3gppSinkMock).topicName() + verify(messageSinkMock).send(routedPerf3GppMessage) + } } - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + on("message with existing route (syslog)") { + whenever(messageSinkMock.send(routedSyslogMessage)) + .thenReturn(Flux.just(successfullyConsumedSyslogMessage)) + val result = cut.route(syslogMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedSyslogMessage) + .verifyComplete() + + verify(syslogSinkMock).topicName() + verify(messageSinkMock).send(routedSyslogMessage) + } } - } - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val result = cut.route(message) - it("should not have route available") { - assertThat(result).isEqualTo(None) + it("should not have route available") { + StepVerifier.create(result).verifyComplete() + } } } } -})
\ No newline at end of file + +}) + +private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = + Router(routing, kafkaPublisherMap, ClientContext(), mock()) + +private val perf3gppTopic = "PERF_PERF" +private val perf3gppSinkMock = mock<KafkaSink>() +private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) + +private val syslogTopic = "SYS_LOG" +private val syslogSinkMock = mock<KafkaSink>() +private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) + +private val messageSinkMock = mock<Sink>() +private val default_partition = None + +private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) +private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) + +private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) +private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt index 571a6680..8616ce03 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt @@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({ .expectNoEvent(waitTime) } } - } + given("valid configuration from cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) @@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val receivedSink1 = it.elementAt(0) - val receivedSink2 = it.elementAt(1) + val route1 = it.elementAt(0) + val route2 = it.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) assertThat(receivedSink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) assertThat(receivedSink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + }.verifyComplete() } } @@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({ }) + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") .password("very secure password") @@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", @@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse(""" "topic_name": "REG_HVVES_PERF3GPP" } }, - "perf3gpp_central": { + "$PERF3GPP_CENTRAL": { "type": "kafka", "aaf_credentials": { "username": "other_client", @@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse(""" private val invalidConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt deleted file mode 100644 index eb0a3173..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import arrow.syntax.collections.tail -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.ves.VesEventOuterClass -import reactor.kafka.sender.KafkaSender - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since December 2018 - */ -internal object KafkaSinkProviderTest : Spek({ - describe("non functional requirements") { - given("sample configuration") { - val config = CollectorConfiguration( - maxRequestSizeBytes = 1024 * 1024, - kafkaServers = "localhost:9090", - routing = Routing(emptyList()) - ) - - val cut = KafkaSinkProvider(config) - - on("sample clients") { - val clients = listOf( - ClientContext(), - ClientContext(), - ClientContext(), - ClientContext()) - - it("should create only one instance of KafkaSender") { - val sinks = clients.map(cut::invoke) - val firstSink = sinks[0] - val restOfSinks = sinks.tail() - - assertThat(restOfSinks).isNotEmpty - assertThat(restOfSinks).allSatisfy { sink -> - assertThat(firstSink.usesSameSenderAs(sink)) - .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") - .isTrue() - } - } - } - } - - given("dummy KafkaSender") { - val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock() - val cut = KafkaSinkProvider(kafkaSender) - - on("close") { - cut.close().unsafeRunSync() - - it("should close KafkaSender") { - verify(kafkaSender).close() - } - } - } - } -}) |