diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-26 14:21:02 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-28 14:16:02 +0100 |
commit | 2174a045086e16611128b20a6d4357c04d9eac4a (patch) | |
tree | 6302837fc6ce5fac26a9da91e7353247c397bc0a | |
parent | 1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff) |
Redefine Routing
As all needed information to route messege is contained inside of
KafkaSink message, we can simply put this object as part of single Route.
Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
27 files changed, 504 insertions, 445 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt index 5ffa39df..e5a83ac4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -data class Routing(val routes: List<Route>) +data class Route(val domain: String, val sink: KafkaSink) -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) +typealias Routing = List<Route> diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index c8a156c5..04bba7e2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Either import arrow.core.None import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding @@ -68,7 +65,7 @@ internal class ConfigurationValidator { // collectorConfiguration CollectorConfiguration(-1, "I do not exist. I'm not even a URL :o", - Routing(emptyList())), + emptyList()), // end TOD0 logLevel ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 37192868..4b89488b 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -185,5 +185,5 @@ internal object ConfigurationValidatorTest : Spek({ } }) -val emptyRouting = Routing(emptyList()) +val emptyRouting: Routing = emptyList() val someFromEmptyRouting = Some(emptyRouting) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 0a1e2d43..1b92d90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux -interface Sink { +interface Sink : Closeable { + fun send(message: RoutedMessage) = send(Flux.just(message)) + fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } +interface SinkProvider : Closeable { + operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> +} + +typealias ConfigurationProvider = () -> Flux<Routing> + interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) @@ -42,11 +51,3 @@ interface Metrics { fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } - -interface SinkProvider : Closeable { - operator fun invoke(ctx: ClientContext): Sink -} - -interface ConfigurationProvider { - operator fun invoke(): Flux<Sequence<KafkaSink>> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fa4f9670..2b29acd9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Sequence<KafkaSink>>() + val config = AtomicReference<Routing>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(kafkaSinks, ctx), - sink = sinkProvider(ctx), + router = Router(routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) } } - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index a95a44d5..6e2e20f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,39 +19,75 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.None import arrow.core.toOption +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux -class Router(private val routing: Routing, private val ctx: ClientContext) { - - constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( - Routing( - kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() - ), - ctx) +class Router internal constructor(private val routing: Routing, + private val messageSinks: Map<String, Lazy<Sink>>, + private val ctx: ClientContext, + private val metrics: Metrics) { + constructor(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext, + metrics: Metrics) : + this(routing, + constructMessageSinks(routing, sinkProvider, ctx), + ctx, + metrics) { + logger.debug(ctx::mdc) { "Routing for client: $routing" } + logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + } - fun findDestination(message: VesMessage): Option<RoutedMessage> = + fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } - .map { it.routeMessage(message) } + .fold({ + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } + Flux.empty<Route>() + }, { + logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } + Flux.just(it) + }) + .flatMap { + val sinkTopic = it.sink.topicName() + messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) + } - private fun Route.routeMessage(message: VesMessage) = - RoutedMessage(targetTopic, partitioning(message.header), message) + private fun routeFor(header: CommonEventHeader) = + routing.find { it.domain == header.domain }.toOption() - private fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - routing.routes.find { it.domain == commonHeader.domain }.toOption() + private fun messageSinkFor(sinkTopic: String) = messageSinks + .getOrElse(sinkTopic) { + throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic") + } companion object { - private val logger = Logger(Routing::class) + private val logger = Logger(Router::class) + private val NONE_PARTITION = None + + internal fun constructMessageSinks(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext) = + routing.map(Route::sink) + .distinctBy { it.topicName() } + .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) } + + private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message) } + +internal class MissingMessageSinkException(msg: String) : Throwable(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 6a2792c3..433e4d57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -53,7 +49,6 @@ internal class VesHvCollector( private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, - private val sink: Sink, private val metrics: Metrics) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = @@ -62,10 +57,10 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) } + // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here + .handleErrors() + .transform(::route) + .handleErrors() .doFinally { releaseBuffersMemory() } .then() @@ -98,18 +93,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux - .flatMap(this::findRoute) - .compose(sink::send) + private fun route(flux: Flux<VesMessage>) = flux + .flatMap(router::route) .doOnNext(this::updateSinkMetrics) - private fun findRoute(msg: VesMessage) = router - .findDestination(msg) - .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } - .filterEmptyWithLog(logger, clientContext::fullMdc, - { "Found route for message: ${it.topic}, partition: ${it.partition}" }, - { "Could not find route for message" }) - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { when (consumedMessage) { is SuccessfullyConsumedMessage -> @@ -119,6 +106,11 @@ internal class VesHvCollector( } } + private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 10fe0a51..20b11753 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = - KafkaSinkProvider(config) + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 1f5df371..185693c0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<Sequence<KafkaSink>> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient + private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createCollectorConfiguration) + .map(::createRoutingDescription) .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } .retryWhen(retry) - - private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = - try { - DataStreams.namedSinks(configuration) - .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) - .map(streamParser::unsafeParse) - .asSequence() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - + private fun createRoutingDescription(configuration: JsonObject): Routing = try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 5052cc5c..2ce0f42f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext @@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, - private val ctx: ClientContext) : Sink { +internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>, + private val ctx: ClientContext) : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = - messages.map(::vesToKafkaRecord).let { records -> - sender.send(records).map { - val msg = it.correlationMetadata() - if (it.exception() == null) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - "Message sent to Kafka with metadata: ${it.recordMetadata()}" + messages.map(::vesToKafkaRecord) + .compose { sender.send(it) } + .map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) + } else { + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - SuccessfullyConsumedMessage(msg) - } else { - logger.warn(ctx::fullMdc, Marker.Invoke()) { - "Failed to send message to Kafka. Reason: ${it.exception().message}" - } - logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } - FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - } - } private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> = SenderRecord.create( - routed.topic, - routed.partition, + routed.targetTopic, + routed.partition.orNull(), FILL_TIMESTAMP_LATER, routed.message.header, routed.message, routed) - internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender - companion object { private val FILL_TIMESTAMP_LATER: Long? = null - private val logger = Logger(KafkaSink::class) + private val logger = Logger(KafkaPublisher::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 96e45a02..7a498652 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,66 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.effects.IO -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.lang.Integer.max +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider internal constructor( - private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { - - constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) - - override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) +internal class KafkaSinkProvider : SinkProvider { + private val messageSinks = synchronizedMap( + mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } override fun close() = IO { - kafkaSender.close() - logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + messageSinks.values.forEach { it.close() } + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } } companion object { private val logger = Logger(KafkaSinkProvider::class) - private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f - private const val BUFFER_MEMORY_MULTIPLIER = 32 - private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - - private fun constructKafkaSender(config: CollectorConfiguration) = - KafkaSender.create(constructSenderOptions(config)) - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) - .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) - .producerProperty(RETRIES_CONFIG, 1) - .producerProperty(ACKS_CONFIG, "1") - .stopOnError(false) - - private fun maxRequestSize(maxRequestSizeBytes: Int) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - - private fun bufferMemory(maxRequestSizeBytes: Int) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt new file mode 100644 index 00000000..40de8c51 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + + +private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f +private const val BUFFER_MEMORY_MULTIPLIER = 32 +private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 + +internal fun createKafkaSender(sinkStream: SinkStream) = + (sinkStream as KafkaSink).let { kafkaSink -> + KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers()) + .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink)) + .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink)) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) + .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .stopOnError(false) + ) + } + +private fun maxRequestSize(kafkaSink: KafkaSink) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt() + +private fun bufferMemory(kafkaSink: KafkaSink) = + Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index b8b55865..6b9c6803 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -20,22 +20,30 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None -import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.core.publisher.Flux +import reactor.test.StepVerifier /** @@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame * @since May 2018 */ object RouterTest : Spek({ - given("sample configuration") { - val config = Routing(listOf( - Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), - Route(SYSLOG.domainName, "ves_trace") - )) - val cut = Router(config, ClientContext()) - - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } + describe("Router") { - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } + whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } + val messageSinkMap = mapOf( + Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(syslogTopic, lazyOf(messageSinkMock)) + ) - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + given("sample routing specification") { + val cut = router(defaultRouting, messageSinkMap) - it("should have route available") { - assertThat(result).isNotNull() - } + on("message with existing route (rtpm)") { + whenever(messageSinkMock.send(routedPerf3GppMessage)) + .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage)) - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } + it("should be properly routed") { + val result = cut.route(perf3gppMessage) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) + assertThat(result).isNotNull() + StepVerifier.create(result) + .expectNext(successfullyConsumedPerf3gppMessage) + .verifyComplete() + + verify(perf3gppSinkMock).topicName() + verify(messageSinkMock).send(routedPerf3GppMessage) + } } - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + on("message with existing route (syslog)") { + whenever(messageSinkMock.send(routedSyslogMessage)) + .thenReturn(Flux.just(successfullyConsumedSyslogMessage)) + val result = cut.route(syslogMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedSyslogMessage) + .verifyComplete() + + verify(syslogSinkMock).topicName() + verify(messageSinkMock).send(routedSyslogMessage) + } } - } - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val result = cut.route(message) - it("should not have route available") { - assertThat(result).isEqualTo(None) + it("should not have route available") { + StepVerifier.create(result).verifyComplete() + } } } } -})
\ No newline at end of file + +}) + +private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = + Router(routing, kafkaPublisherMap, ClientContext(), mock()) + +private val perf3gppTopic = "PERF_PERF" +private val perf3gppSinkMock = mock<KafkaSink>() +private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) + +private val syslogTopic = "SYS_LOG" +private val syslogSinkMock = mock<KafkaSink>() +private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) + +private val messageSinkMock = mock<Sink>() +private val default_partition = None + +private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) +private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) + +private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) +private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt index 571a6680..8616ce03 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt @@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({ .expectNoEvent(waitTime) } } - } + given("valid configuration from cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) @@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val receivedSink1 = it.elementAt(0) - val receivedSink2 = it.elementAt(1) + val route1 = it.elementAt(0) + val route2 = it.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) assertThat(receivedSink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) assertThat(receivedSink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + }.verifyComplete() } } @@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({ }) + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") .password("very secure password") @@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", @@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse(""" "topic_name": "REG_HVVES_PERF3GPP" } }, - "perf3gpp_central": { + "$PERF3GPP_CENTRAL": { "type": "kafka", "aaf_credentials": { "username": "other_client", @@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse(""" private val invalidConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt deleted file mode 100644 index eb0a3173..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import arrow.syntax.collections.tail -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.ves.VesEventOuterClass -import reactor.kafka.sender.KafkaSender - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since December 2018 - */ -internal object KafkaSinkProviderTest : Spek({ - describe("non functional requirements") { - given("sample configuration") { - val config = CollectorConfiguration( - maxRequestSizeBytes = 1024 * 1024, - kafkaServers = "localhost:9090", - routing = Routing(emptyList()) - ) - - val cut = KafkaSinkProvider(config) - - on("sample clients") { - val clients = listOf( - ClientContext(), - ClientContext(), - ClientContext(), - ClientContext()) - - it("should create only one instance of KafkaSender") { - val sinks = clients.map(cut::invoke) - val firstSink = sinks[0] - val restOfSinks = sinks.tail() - - assertThat(restOfSinks).isNotEmpty - assertThat(restOfSinks).allSatisfy { sink -> - assertThat(firstSink.usesSameSenderAs(sink)) - .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") - .isTrue() - } - } - } - } - - given("dummy KafkaSender") { - val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock() - val cut = KafkaSinkProvider(kafkaSender) - - on("close") { - cut.close().unsafeRunSync() - - it("should close KafkaSender") { - verify(kafkaSender).close() - } - } - } - } -}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index a6b32ed9..92719e94 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -107,8 +107,8 @@ object MetricsSpecification : Spek({ assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) - .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") + assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC)) + .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric") .isEqualTo(1) } } @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 50fe098c..61a9a356 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec logger.info { "Processed $runs connections each containing $numMessages msgs." } - logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } + logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) @@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({ }) -private const val ONE_MILION = 1_000_000.0 +private const val ONE_MILLION = 1_000_000.0 private val rand = Random() private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 109915a1..ec540606 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext @@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : AutoCloseable { +class Sut(sink: Sink = StoringSink()) : Closeable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider) + healthStateProvider + ) + private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector @@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { throw IllegalStateException("Collector not available.") } - override fun close() { - collectorProvider.close().unsafeRunSync() + + fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) + return sink.sentMessages + } + + fun handleConnection(vararg packets: ByteBuf) { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) } + override fun close() = collectorProvider.close() + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } - class DummySinkProvider(private val sink: Sink) : SinkProvider { - private val active = AtomicBoolean(true) + private val sinkInitialized = AtomicBoolean(false) - override fun invoke(ctx: ClientContext) = sink - - override fun close() = IO { - active.set(false) + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + sinkInitialized.set(true) + sink } - val closed get() = !active.get() - + override fun close() = + if (sinkInitialized.get()) { + sink.close() + } else { + IO.unit + } } private val timeout = Duration.ofSeconds(10) -fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) - return sink.sentMessages -} - -fun Sut.handleConnection(vararg packets: ByteBuf) { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) -} - -fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 21c5c189..5d215fc5 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.None import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -65,12 +65,28 @@ object VesHvSpecification : Spek({ .hasSize(2) } + it("should create sink lazily") { + val (sut, sink) = vesHvWithStoringSink() + + // just connecting should not create sink + sut.handleConnection() + sut.close().unsafeRunSync() + + // then + assertThat(sink.closed).isFalse() + } + it("should close sink when closing collector provider") { - val (sut, _) = vesHvWithStoringSink() + val (sut, sink) = vesHvWithStoringSink() + // given Sink initialized + // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) - sut.close() + // when + sut.close().unsafeRunSync() - assertThat(sut.sinkProvider.closed).isTrue() + // then + assertThat(sink.closed).isTrue() } } @@ -145,14 +161,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None) } it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -161,14 +177,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages[0].topic).describedAs("first message topic") + assertThat(messages[0].targetTopic).describedAs("first message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[1].topic).describedAs("second message topic") + assertThat(messages[1].targetTopic).describedAs("second message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[2].topic).describedAs("last message topic") - .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + assertThat(messages[2].targetTopic).describedAs("last message topic") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) } it("should drop message if route was not found") { @@ -181,7 +197,7 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -205,7 +221,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,21 +229,21 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configWithEmptyRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] - assertThat(message.topic).describedAs("routed message topic after configuration's change") + assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(PERF3GPP_TOPIC) assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should change domain routing") { @@ -236,22 +252,22 @@ object VesHvSpecification : Spek({ assertThat(messages).hasSize(1) val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") .isEqualTo(PERF3GPP_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should update routing for each client sending one message") { @@ -261,7 +277,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -269,8 +285,8 @@ object VesHvSpecification : Spek({ val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -287,7 +303,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) println("config changed") } } @@ -297,8 +313,8 @@ object VesHvSpecification : Spek({ sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -320,7 +336,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +365,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index a398967d..c465fd91 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,67 +20,21 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcae.collectors.veshv.config.api.model.Routing import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" -const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" -const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" - -val configWithBasicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithTwoDomainsToOneTopicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(HEARTBEAT.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(MEASUREMENT.domainName) - .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithDifferentRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(ALTERNATE_PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithEmptyRouting = emptySequence<KafkaSink>() - - class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() + private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(kafkaSinkSequence) + configStream.onNext(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index b599a076..a450b794 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ class FakeMetrics : Metrics { override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic.compute(msg.targetTopic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt new file mode 100644 index 00000000..e9914ef1 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink + +const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" +const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" +const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" +const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val perf3gppKafkaSink = ImmutableKafkaSink.builder() + .name("PERF3GPP") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() +private val alternativeKafkaSink = ImmutableKafkaSink.builder() + .name("ALTERNATE") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() + + +val basicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink) +) + +val alternativeRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink) +) + +val twoDomainsToOneTopicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink), + Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink), + Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink) +) + +val emptyRouting: Routing = emptyList() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 51f724e0..160defdb 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage @@ -30,6 +31,7 @@ import reactor.core.publisher.Flux import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong /** @@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong */ class StoringSink : Sink { private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + private val active = AtomicBoolean(true) + val closed get() = !active.get() val sentMessages: List<RoutedMessage> get() = sent.toList() @@ -45,6 +49,13 @@ class StoringSink : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } + + /* + * TOD0: if the code would look like: + * ```IO { active.set(false) }``` + * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) + */ + override fun close() = active.set(false).run { IO.unit } } /** diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt index e4d147b1..04f9be63 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,4 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain -data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) +import arrow.core.Option + + +data class RoutedMessage(val message: VesMessage, + val targetTopic: String, + val partition: Option<Int>) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 2fb44768..c04c2c95 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() sentMessages.increment() - sentMessagesByTopic(msg.topic).increment() + sentMessagesByTopic(msg.targetTopic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index d15dccef..aed4d928 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -59,9 +60,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( AdapterFactory.configurationProvider(config.cbs), - AdapterFactory.sinkCreatorFactory(config.collector), + AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes + config.server.maxPayloadSizeBytes, + HealthState.INSTANCE ) private fun logServerStarted(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e452a5f4..f260f158 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.Option import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge @@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.ves.VesEventOuterClass import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit @@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + vesEvent().run { toRoutedMessage(topic, partition) } fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - } + vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = - vesEvent().let { evt -> - val builder = evt.toBuilder() + vesEvent().run { + val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 - builder.build() - }.let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - }
\ No newline at end of file + builder.build().toRoutedMessage(topic, partition) + } + +private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String, + partition: Int, + receivedAt: Temporal = Instant.now()) = + RoutedMessage( + VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)), + topic, + Option.just(partition) + ) + |