From 2174a045086e16611128b20a6d4357c04d9eac4a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Tue, 26 Mar 2019 14:21:02 +0100 Subject: Redefine Routing As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka --- .../collectors/veshv/config/api/model/Routing.kt | 6 +- .../veshv/config/impl/ConfigurationValidator.kt | 5 +- .../config/impl/ConfigurationValidatorTest.kt | 2 +- .../dcae/collectors/veshv/boundary/adapters.kt | 23 +-- .../collectors/veshv/factory/CollectorFactory.kt | 10 +- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 74 ++++++-- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 32 ++-- .../veshv/impl/adapters/AdapterFactory.kt | 4 +- .../impl/adapters/ConfigurationProviderImpl.kt | 33 ++-- .../veshv/impl/adapters/kafka/KafkaPublisher.kt | 78 ++++++++ .../veshv/impl/adapters/kafka/KafkaSink.kt | 79 -------- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 60 ++---- .../org/onap/dcae/collectors/veshv/impl/kafka.kt | 56 ++++++ .../onap/dcae/collectors/veshv/impl/RouterTest.kt | 123 +++++++----- .../impl/adapters/ConfigurationProviderImplTest.kt | 207 +++++++++++++++++++++ .../impl/adapters/ConfigurationProviderTest.kt | 197 -------------------- .../impl/adapters/kafka/KafkaSinkProviderTest.kt | 88 --------- .../veshv/tests/component/MetricsSpecification.kt | 22 +-- .../tests/component/PerformanceSpecification.kt | 12 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 63 ++++--- .../veshv/tests/component/VesHvSpecification.kt | 86 +++++---- .../collectors/veshv/tests/fakes/configuration.kt | 54 +----- .../dcae/collectors/veshv/tests/fakes/metrics.kt | 4 +- .../dcae/collectors/veshv/tests/fakes/routing.kt | 60 ++++++ .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 13 +- .../dcae/collectors/veshv/domain/RoutedMessage.kt | 9 +- .../veshv/main/metrics/MicrometerMetrics.kt | 4 +- .../collectors/veshv/main/servers/VesServer.kt | 6 +- .../collectors/veshv/main/MicrometerMetricsTest.kt | 33 ++-- 29 files changed, 751 insertions(+), 692 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt create mode 100644 sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt index 5ffa39df..e5a83ac4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -data class Routing(val routes: List) +data class Route(val domain: String, val sink: KafkaSink) -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) +typealias Routing = List diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index c8a156c5..04bba7e2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Either import arrow.core.None import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding @@ -68,7 +65,7 @@ internal class ConfigurationValidator { // collectorConfiguration CollectorConfiguration(-1, "I do not exist. I'm not even a URL :o", - Routing(emptyList())), + emptyList()), // end TOD0 logLevel ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 37192868..4b89488b 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -185,5 +185,5 @@ internal object ConfigurationValidatorTest : Spek({ } }) -val emptyRouting = Routing(emptyList()) +val emptyRouting: Routing = emptyList() val someFromEmptyRouting = Some(emptyRouting) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 0a1e2d43..1b92d90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux -interface Sink { +interface Sink : Closeable { + fun send(message: RoutedMessage) = send(Flux.just(message)) + fun send(messages: Flux): Flux } +interface SinkProvider : Closeable { + operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy +} + +typealias ConfigurationProvider = () -> Flux + interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) @@ -42,11 +51,3 @@ interface Metrics { fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } - -interface SinkProvider : Closeable { - operator fun invoke(ctx: ClientContext): Sink -} - -interface ConfigurationProvider { - operator fun invoke(): Flux> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fa4f9670..2b29acd9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference>() + val config = AtomicReference() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(kafkaSinks: Sequence, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(kafkaSinks, ctx), - sink = sinkProvider(ctx), + router = Router(routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) } } - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index a95a44d5..6e2e20f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,39 +19,75 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.None import arrow.core.toOption +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux -class Router(private val routing: Routing, private val ctx: ClientContext) { - - constructor(kafkaSinks: Sequence, ctx: ClientContext) : this( - Routing( - kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() - ), - ctx) +class Router internal constructor(private val routing: Routing, + private val messageSinks: Map>, + private val ctx: ClientContext, + private val metrics: Metrics) { + constructor(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext, + metrics: Metrics) : + this(routing, + constructMessageSinks(routing, sinkProvider, ctx), + ctx, + metrics) { + logger.debug(ctx::mdc) { "Routing for client: $routing" } + logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + } - fun findDestination(message: VesMessage): Option = + fun route(message: VesMessage): Flux = routeFor(message.header) - .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } - .map { it.routeMessage(message) } + .fold({ + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } + Flux.empty() + }, { + logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } + Flux.just(it) + }) + .flatMap { + val sinkTopic = it.sink.topicName() + messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) + } - private fun Route.routeMessage(message: VesMessage) = - RoutedMessage(targetTopic, partitioning(message.header), message) + private fun routeFor(header: CommonEventHeader) = + routing.find { it.domain == header.domain }.toOption() - private fun routeFor(commonHeader: CommonEventHeader): Option = - routing.routes.find { it.domain == commonHeader.domain }.toOption() + private fun messageSinkFor(sinkTopic: String) = messageSinks + .getOrElse(sinkTopic) { + throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic") + } companion object { - private val logger = Logger(Routing::class) + private val logger = Logger(Router::class) + private val NONE_PARTITION = None + + internal fun constructMessageSinks(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext) = + routing.map(Route::sink) + .distinctBy { it.topicName() } + .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) } + + private fun Lazy.send(message: RoutedMessage) = value.send(message) } + +internal class MissingMessageSinkException(msg: String) : Throwable(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 6a2792c3..433e4d57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -53,7 +49,6 @@ internal class VesHvCollector( private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, - private val sink: Sink, private val metrics: Metrics) : Collector { override fun handleConnection(dataStream: Flux): Mono = @@ -62,10 +57,10 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) } + // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here + .handleErrors() + .transform(::route) + .handleErrors() .doFinally { releaseBuffersMemory() } .then() @@ -98,18 +93,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux): Flux = flux - .flatMap(this::findRoute) - .compose(sink::send) + private fun route(flux: Flux) = flux + .flatMap(router::route) .doOnNext(this::updateSinkMetrics) - private fun findRoute(msg: VesMessage) = router - .findDestination(msg) - .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } - .filterEmptyWithLog(logger, clientContext::fullMdc, - { "Found route for message: ${it.topic}, partition: ${it.partition}" }, - { "Could not find route for message" }) - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { when (consumedMessage) { is SuccessfullyConsumedMessage -> @@ -119,6 +106,11 @@ internal class VesHvCollector( } } + private fun Flux.handleErrors(): Flux = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 10fe0a51..20b11753 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = - KafkaSinkProvider(config) + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 1f5df371..185693c0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono> = + override fun invoke(): Flux = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono> = cbsClient + private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createCollectorConfiguration) + .map(::createRoutingDescription) .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } .retryWhen(retry) - - private fun createCollectorConfiguration(configuration: JsonObject): Sequence = - try { - DataStreams.namedSinks(configuration) - .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) - .map(streamParser::unsafeParse) - .asSequence() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - + private fun createRoutingDescription(configuration: JsonObject): Routing = try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt new file mode 100644 index 00000000..2ce0f42f --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderRecord + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class KafkaPublisher(private val sender: KafkaSender, + private val ctx: ClientContext) : Sink { + + override fun send(messages: Flux): Flux = + messages.map(::vesToKafkaRecord) + .compose { sender.send(it) } + .map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) + } else { + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) + } + } + + private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord = + SenderRecord.create( + routed.targetTopic, + routed.partition.orNull(), + FILL_TIMESTAMP_LATER, + routed.message.header, + routed.message, + routed) + + companion object { + private val FILL_TIMESTAMP_LATER: Long? = null + private val logger = Logger(KafkaPublisher::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt deleted file mode 100644 index 5052cc5c..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ConsumedMessage -import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage -import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderRecord - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class KafkaSink(private val sender: KafkaSender, - private val ctx: ClientContext) : Sink { - - override fun send(messages: Flux): Flux = - messages.map(::vesToKafkaRecord).let { records -> - sender.send(records).map { - val msg = it.correlationMetadata() - if (it.exception() == null) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - "Message sent to Kafka with metadata: ${it.recordMetadata()}" - } - SuccessfullyConsumedMessage(msg) - } else { - logger.warn(ctx::fullMdc, Marker.Invoke()) { - "Failed to send message to Kafka. Reason: ${it.exception().message}" - } - logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } - FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) - } - } - } - - private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord = - SenderRecord.create( - routed.topic, - routed.partition, - FILL_TIMESTAMP_LATER, - routed.message.header, - routed.message, - routed) - - internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender - - companion object { - private val FILL_TIMESTAMP_LATER: Long? = null - private val logger = Logger(KafkaSink::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 96e45a02..7a498652 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,66 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.effects.IO -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.lang.Integer.max +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk * @since June 2018 */ -internal class KafkaSinkProvider internal constructor( - private val kafkaSender: KafkaSender) : SinkProvider { - - constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) - - override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) +internal class KafkaSinkProvider : SinkProvider { + private val messageSinks = synchronizedMap( + mutableMapOf>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } override fun close() = IO { - kafkaSender.close() - logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + messageSinks.values.forEach { it.close() } + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } } companion object { private val logger = Logger(KafkaSinkProvider::class) - private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f - private const val BUFFER_MEMORY_MULTIPLIER = 32 - private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - - private fun constructKafkaSender(config: CollectorConfiguration) = - KafkaSender.create(constructSenderOptions(config)) - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) - .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) - .producerProperty(RETRIES_CONFIG, 1) - .producerProperty(ACKS_CONFIG, "1") - .stopOnError(false) - - private fun maxRequestSize(maxRequestSizeBytes: Int) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - - private fun bufferMemory(maxRequestSizeBytes: Int) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt new file mode 100644 index 00000000..40de8c51 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + + +private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f +private const val BUFFER_MEMORY_MULTIPLIER = 32 +private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 + +internal fun createKafkaSender(sinkStream: SinkStream) = + (sinkStream as KafkaSink).let { kafkaSink -> + KafkaSender.create(SenderOptions.create() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers()) + .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink)) + .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink)) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) + .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .stopOnError(false) + ) + } + +private fun maxRequestSize(kafkaSink: KafkaSink) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt() + +private fun bufferMemory(kafkaSink: KafkaSink) = + Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index b8b55865..6b9c6803 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -20,22 +20,30 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None -import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.core.publisher.Flux +import reactor.test.StepVerifier /** @@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame * @since May 2018 */ object RouterTest : Spek({ - given("sample configuration") { - val config = Routing(listOf( - Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), - Route(SYSLOG.domainName, "ves_trace") - )) - val cut = Router(config, ClientContext()) - - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } + describe("Router") { - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } + whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } + val messageSinkMap = mapOf( + Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(syslogTopic, lazyOf(messageSinkMock)) + ) - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + given("sample routing specification") { + val cut = router(defaultRouting, messageSinkMap) - it("should have route available") { - assertThat(result).isNotNull() - } + on("message with existing route (rtpm)") { + whenever(messageSinkMock.send(routedPerf3GppMessage)) + .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage)) - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } + it("should be properly routed") { + val result = cut.route(perf3gppMessage) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) + assertThat(result).isNotNull() + StepVerifier.create(result) + .expectNext(successfullyConsumedPerf3gppMessage) + .verifyComplete() + + verify(perf3gppSinkMock).topicName() + verify(messageSinkMock).send(routedPerf3GppMessage) + } } - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + on("message with existing route (syslog)") { + whenever(messageSinkMock.send(routedSyslogMessage)) + .thenReturn(Flux.just(successfullyConsumedSyslogMessage)) + val result = cut.route(syslogMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedSyslogMessage) + .verifyComplete() + + verify(syslogSinkMock).topicName() + verify(messageSinkMock).send(routedSyslogMessage) + } } - } - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val result = cut.route(message) - it("should not have route available") { - assertThat(result).isEqualTo(None) + it("should not have route available") { + StepVerifier.create(result).verifyComplete() + } } } } -}) \ No newline at end of file + +}) + +private fun router(routing: Routing, kafkaPublisherMap: Map>) = + Router(routing, kafkaPublisherMap, ClientContext(), mock()) + +private val perf3gppTopic = "PERF_PERF" +private val perf3gppSinkMock = mock() +private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) + +private val syslogTopic = "SYS_LOG" +private val syslogSinkMock = mock() +private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) + +private val messageSinkMock = mock() +private val default_partition = None + +private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) +private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) + +private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) +private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt new file mode 100644 index 00000000..8616ce03 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt @@ -0,0 +1,207 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.google.gson.JsonParser +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import reactor.core.publisher.Flux + +import reactor.core.publisher.Mono +import reactor.retry.Retry +import reactor.test.StepVerifier +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since May 2018 + */ +internal object ConfigurationProviderImplTest : Spek({ + + describe("Configuration provider") { + + val cbsClient: CbsClient = mock() + val cbsClientMock: Mono = Mono.just(cbsClient) + val healthStateProvider = HealthState.INSTANCE + + given("configuration is never in cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + + on("waiting for configuration") { + val waitTime = Duration.ofMillis(100) + + it("should not get it") { + StepVerifier.create(configProvider().take(1)) + .expectNoEvent(waitTime) + } + } + } + + given("valid configuration from cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(validConfiguration)) + it("should use received configuration") { + + StepVerifier.create(configProvider().take(1)) + .consumeNextWith { + val route1 = it.elementAt(0) + val route2 = it.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + + }.verifyComplete() + } + } + + } + given("invalid configuration from cbs") { + val iterationCount = 3L + val configProvider = constructConfigurationProvider( + cbsClientMock, healthStateProvider, iterationCount + ) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(invalidConfiguration)) + + it("should interrupt the flux") { + StepVerifier.create(configProvider()) + .verifyError() + } + + it("should update the health state") { + StepVerifier.create(healthStateProvider().take(iterationCount)) + .expectNextCount(iterationCount - 1) + .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + .verifyComplete() + } + } + } + } + +}) + + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() + +private val validConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "$PERF3GPP_CENTRAL": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val invalidConfiguration = JsonParser().parse(""" +{ + "streams_publishes": { + "$PERF3GPP_REGIONAL": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" + } + } + } +}""").asJsonObject + +private val firstRequestDelay = Duration.ofMillis(1) +private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() + +private fun constructConfigurationProvider(cbsClientMono: Mono, + healthState: HealthState, + iterationCount: Long = 1 +): ConfigurationProviderImpl { + + val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConfigurationProviderImpl( + cbsClientMono, + firstRequestDelay, + requestInterval, + healthState, + streamParser, + retry + ) +} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt deleted file mode 100644 index 571a6680..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ /dev/null @@ -1,197 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonParser -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Retry -import reactor.test.StepVerifier -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal object ConfigurationProviderImplTest : Spek({ - - describe("Configuration provider") { - - val cbsClient: CbsClient = mock() - val cbsClientMock: Mono = Mono.just(cbsClient) - val healthStateProvider = HealthState.INSTANCE - - given("configuration is never in cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("waiting for configuration") { - val waitTime = Duration.ofMillis(100) - - it("should not get it") { - StepVerifier.create(configProvider().take(1)) - .expectNoEvent(waitTime) - } - } - - } - given("valid configuration from cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(validConfiguration)) - it("should use received configuration") { - - StepVerifier.create(configProvider().take(1)) - .consumeNextWith { - val receivedSink1 = it.elementAt(0) - val receivedSink2 = it.elementAt(1) - - assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) - assertThat(receivedSink1.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") - assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") - - assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) - assertThat(receivedSink2.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") - assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") - }.verifyComplete() - } - } - - } - given("invalid configuration from cbs") { - val iterationCount = 3L - val configProvider = constructConfigurationProvider( - cbsClientMock, healthStateProvider, iterationCount - ) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(invalidConfiguration)) - - it("should interrupt the flux") { - StepVerifier.create(configProvider()) - .verifyError() - } - - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - .verifyComplete() - } - } - } - } - -}) - -private val aafCredentials1 = ImmutableAafCredentials.builder() - .username("client") - .password("very secure password") - .build() - -private val aafCredentials2 = ImmutableAafCredentials.builder() - .username("other_client") - .password("another very secure password") - .build() - -private val validConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "perf3gpp_regional": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "topic_name": "REG_HVVES_PERF3GPP" - } - }, - "perf3gpp_central": { - "type": "kafka", - "aaf_credentials": { - "username": "other_client", - "password": "another very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", - "topic_name": "CEN_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val invalidConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "perf3gpp_regional": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "popic_name": "REG_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val firstRequestDelay = Duration.ofMillis(1) -private val requestInterval = Duration.ofMillis(1) -private val streamParser = StreamFromGsonParsers.kafkaSinkParser() - -private fun constructConfigurationProvider(cbsClientMono: Mono, - healthState: HealthState, - iterationCount: Long = 1 -): ConfigurationProviderImpl { - - val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - - return ConfigurationProviderImpl( - cbsClientMono, - firstRequestDelay, - requestInterval, - healthState, - streamParser, - retry - ) -} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt deleted file mode 100644 index eb0a3173..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import arrow.syntax.collections.tail -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.ves.VesEventOuterClass -import reactor.kafka.sender.KafkaSender - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -internal object KafkaSinkProviderTest : Spek({ - describe("non functional requirements") { - given("sample configuration") { - val config = CollectorConfiguration( - maxRequestSizeBytes = 1024 * 1024, - kafkaServers = "localhost:9090", - routing = Routing(emptyList()) - ) - - val cut = KafkaSinkProvider(config) - - on("sample clients") { - val clients = listOf( - ClientContext(), - ClientContext(), - ClientContext(), - ClientContext()) - - it("should create only one instance of KafkaSender") { - val sinks = clients.map(cut::invoke) - val firstSink = sinks[0] - val restOfSinks = sinks.tail() - - assertThat(restOfSinks).isNotEmpty - assertThat(restOfSinks).allSatisfy { sink -> - assertThat(firstSink.usesSameSenderAs(sink)) - .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") - .isTrue() - } - } - } - } - - given("dummy KafkaSender") { - val kafkaSender: KafkaSender = mock() - val cut = KafkaSinkProvider(kafkaSender) - - on("close") { - cut.close().unsafeRunSync() - - it("should close KafkaSender") { - verify(kafkaSender).close() - } - } - } - } -}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index a6b32ed9..92719e94 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -107,8 +107,8 @@ object MetricsSpecification : Spek({ assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) - .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") + assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC)) + .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric") .isEqualTo(1) } } @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 50fe098c..61a9a356 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec logger.info { "Processed $runs connections each containing $numMessages msgs." } - logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } + logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) @@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({ }) -private const val ONE_MILION = 1_000_000.0 +private const val ONE_MILLION = 1_000_000.0 private val rand = Random() private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 109915a1..ec540606 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext @@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : AutoCloseable { +class Sut(sink: Sink = StoringSink()) : Closeable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider) + healthStateProvider + ) + private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector @@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { throw IllegalStateException("Collector not available.") } - override fun close() { - collectorProvider.close().unsafeRunSync() + + fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) + return sink.sentMessages + } + + fun handleConnection(vararg packets: ByteBuf) { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) } + override fun close() = collectorProvider.close() + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } - class DummySinkProvider(private val sink: Sink) : SinkProvider { - private val active = AtomicBoolean(true) + private val sinkInitialized = AtomicBoolean(false) - override fun invoke(ctx: ClientContext) = sink - - override fun close() = IO { - active.set(false) + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + sinkInitialized.set(true) + sink } - val closed get() = !active.get() - + override fun close() = + if (sinkInitialized.get()) { + sink.close() + } else { + IO.unit + } } private val timeout = Duration.ofSeconds(10) -fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) - return sink.sentMessages -} - -fun Sut.handleConnection(vararg packets: ByteBuf) { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) -} - -fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence = configWithBasicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence = configWithBasicRouting): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence = configWithBasicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 21c5c189..5d215fc5 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.None import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -65,12 +65,28 @@ object VesHvSpecification : Spek({ .hasSize(2) } + it("should create sink lazily") { + val (sut, sink) = vesHvWithStoringSink() + + // just connecting should not create sink + sut.handleConnection() + sut.close().unsafeRunSync() + + // then + assertThat(sink.closed).isFalse() + } + it("should close sink when closing collector provider") { - val (sut, _) = vesHvWithStoringSink() + val (sut, sink) = vesHvWithStoringSink() + // given Sink initialized + // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) - sut.close() + // when + sut.close().unsafeRunSync() - assertThat(sut.sinkProvider.closed).isTrue() + // then + assertThat(sink.closed).isTrue() } } @@ -145,14 +161,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None) } it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -161,14 +177,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages[0].topic).describedAs("first message topic") + assertThat(messages[0].targetTopic).describedAs("first message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[1].topic).describedAs("second message topic") + assertThat(messages[1].targetTopic).describedAs("second message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[2].topic).describedAs("last message topic") - .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + assertThat(messages[2].targetTopic).describedAs("last message topic") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) } it("should drop message if route was not found") { @@ -181,7 +197,7 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -205,7 +221,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,21 +229,21 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configWithEmptyRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] - assertThat(message.topic).describedAs("routed message topic after configuration's change") + assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(PERF3GPP_TOPIC) assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should change domain routing") { @@ -236,22 +252,22 @@ object VesHvSpecification : Spek({ assertThat(messages).hasSize(1) val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") .isEqualTo(PERF3GPP_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should update routing for each client sending one message") { @@ -261,7 +277,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -269,8 +285,8 @@ object VesHvSpecification : Spek({ val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -287,7 +303,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) println("config changed") } } @@ -297,8 +313,8 @@ object VesHvSpecification : Spek({ sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -320,7 +336,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +365,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index a398967d..c465fd91 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,67 +20,21 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcae.collectors.veshv.config.api.model.Routing import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" -const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" -const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" - -val configWithBasicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithTwoDomainsToOneTopicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(HEARTBEAT.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(MEASUREMENT.domainName) - .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithDifferentRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(ALTERNATE_PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithEmptyRouting = emptySequence() - - class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor, Sequence> = UnicastProcessor.create() + private val configStream: FluxProcessor = UnicastProcessor.create() - fun updateConfiguration(kafkaSinkSequence: Sequence) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(kafkaSinkSequence) + configStream.onNext(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index b599a076..a450b794 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ class FakeMetrics : Metrics { override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic.compute(msg.targetTopic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt new file mode 100644 index 00000000..e9914ef1 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink + +const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" +const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" +const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" +const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val perf3gppKafkaSink = ImmutableKafkaSink.builder() + .name("PERF3GPP") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() +private val alternativeKafkaSink = ImmutableKafkaSink.builder() + .name("ALTERNATE") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() + + +val basicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink) +) + +val alternativeRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink) +) + +val twoDomainsToOneTopicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink), + Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink), + Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink) +) + +val emptyRouting: Routing = emptyList() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 51f724e0..160defdb 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage @@ -30,6 +31,7 @@ import reactor.core.publisher.Flux import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong /** @@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong */ class StoringSink : Sink { private val sent: Deque = ConcurrentLinkedDeque() + private val active = AtomicBoolean(true) + val closed get() = !active.get() val sentMessages: List get() = sent.toList() @@ -45,6 +49,13 @@ class StoringSink : Sink { override fun send(messages: Flux): Flux { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } + + /* + * TOD0: if the code would look like: + * ```IO { active.set(false) }``` + * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) + */ + override fun close() = active.set(false).run { IO.unit } } /** diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt index e4d147b1..04f9be63 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,4 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain -data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) +import arrow.core.Option + + +data class RoutedMessage(val message: VesMessage, + val targetTopic: String, + val partition: Option) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 2fb44768..c04c2c95 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() sentMessages.increment() - sentMessagesByTopic(msg.topic).increment() + sentMessagesByTopic(msg.targetTopic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index d15dccef..aed4d928 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -59,9 +60,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( AdapterFactory.configurationProvider(config.cbs), - AdapterFactory.sinkCreatorFactory(config.collector), + AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes + config.server.maxPayloadSizeBytes, + HealthState.INSTANCE ) private fun logServerStarted(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e452a5f4..f260f158 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.Option import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge @@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.ves.VesEventOuterClass import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit @@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + vesEvent().run { toRoutedMessage(topic, partition) } fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - } + vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = - vesEvent().let { evt -> - val builder = evt.toBuilder() + vesEvent().run { + val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 - builder.build() - }.let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } \ No newline at end of file + builder.build().toRoutedMessage(topic, partition) + } + +private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String, + partition: Int, + receivedAt: Temporal = Instant.now()) = + RoutedMessage( + VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)), + topic, + Option.just(partition) + ) + -- cgit 1.2.3-korg