aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 14:21:02 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-28 14:16:02 +0100
commit2174a045086e16611128b20a6d4357c04d9eac4a (patch)
tree6302837fc6ce5fac26a9da91e7353247c397bc0a
parent1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff)
Redefine Routing
As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt6
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt5
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt74
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt)45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt60
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt56
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt123
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt)22
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt88
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt22
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt12
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt63
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt86
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt54
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt60
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt13
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt9
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt33
27 files changed, 504 insertions, 445 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index 5ffa39df..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-data class Routing(val routes: List<Route>)
+data class Route(val domain: String, val sink: KafkaSink)
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
+typealias Routing = List<Route>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index c8a156c5..04bba7e2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Either
import arrow.core.None
import arrow.core.Option
-import arrow.core.Some
import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
@@ -68,7 +65,7 @@ internal class ConfigurationValidator {
// collectorConfiguration
CollectorConfiguration(-1,
"I do not exist. I'm not even a URL :o",
- Routing(emptyList())),
+ emptyList()),
// end TOD0
logLevel
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 37192868..4b89488b 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -185,5 +185,5 @@ internal object ConfigurationValidatorTest : Spek({
}
})
-val emptyRouting = Routing(emptyList())
+val emptyRouting: Routing = emptyList()
val someFromEmptyRouting = Some(emptyRouting)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 0a1e2d43..1b92d90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
-interface Sink {
+interface Sink : Closeable {
+ fun send(message: RoutedMessage) = send(Flux.just(message))
+
fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
+interface SinkProvider : Closeable {
+ operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
+}
+
+typealias ConfigurationProvider = () -> Flux<Routing>
+
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
@@ -42,11 +51,3 @@ interface Metrics {
fun notifyClientConnected()
fun notifyClientRejected(cause: ClientRejectionCause)
}
-
-interface SinkProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Sink
-}
-
-interface ConfigurationProvider {
- operator fun invoke(): Flux<Sequence<KafkaSink>>
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index fa4f9670..2b29acd9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import java.util.concurrent.atomic.AtomicReference
/**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Sequence<KafkaSink>>()
+ val config = AtomicReference<Routing>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(kafkaSinks, ctx),
- sink = sinkProvider(ctx),
+ router = Router(routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
}
}
-
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index a95a44d5..6e2e20f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,39 +19,75 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.None
import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
-class Router(private val routing: Routing, private val ctx: ClientContext) {
-
- constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
- Routing(
- kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
- ),
- ctx)
+class Router internal constructor(private val routing: Routing,
+ private val messageSinks: Map<String, Lazy<Sink>>,
+ private val ctx: ClientContext,
+ private val metrics: Metrics) {
+ constructor(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext,
+ metrics: Metrics) :
+ this(routing,
+ constructMessageSinks(routing, sinkProvider, ctx),
+ ctx,
+ metrics) {
+ logger.debug(ctx::mdc) { "Routing for client: $routing" }
+ logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" }
+ }
- fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
- .map { it.routeMessage(message) }
+ .fold({
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
+ Flux.empty<Route>()
+ }, {
+ logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
+ Flux.just(it)
+ })
+ .flatMap {
+ val sinkTopic = it.sink.topicName()
+ messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
+ }
- private fun Route.routeMessage(message: VesMessage) =
- RoutedMessage(targetTopic, partitioning(message.header), message)
+ private fun routeFor(header: CommonEventHeader) =
+ routing.find { it.domain == header.domain }.toOption()
- private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- routing.routes.find { it.domain == commonHeader.domain }.toOption()
+ private fun messageSinkFor(sinkTopic: String) = messageSinks
+ .getOrElse(sinkTopic) {
+ throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
+ }
companion object {
- private val logger = Logger(Routing::class)
+ private val logger = Logger(Router::class)
+ private val NONE_PARTITION = None
+
+ internal fun constructMessageSinks(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext) =
+ routing.map(Route::sink)
+ .distinctBy { it.topicName() }
+ .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
}
+
+ private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
}
+
+internal class MissingMessageSinkException(msg: String) : Throwable(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 6a2792c3..433e4d57 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
-import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -53,7 +49,6 @@ internal class VesHvCollector(
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
- private val sink: Sink,
private val metrics: Metrics) : Collector {
override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
@@ -62,10 +57,10 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume {
- metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
- logger.handleReactiveStreamError(clientContext, it) }
+ // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
+ .handleErrors()
+ .transform(::route)
+ .handleErrors()
.doFinally { releaseBuffersMemory() }
.then()
@@ -98,18 +93,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
- .flatMap(this::findRoute)
- .compose(sink::send)
+ private fun route(flux: Flux<VesMessage>) = flux
+ .flatMap(router::route)
.doOnNext(this::updateSinkMetrics)
- private fun findRoute(msg: VesMessage) = router
- .findDestination(msg)
- .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
- .filterEmptyWithLog(logger, clientContext::fullMdc,
- { "Found route for message: ${it.topic}, partition: ${it.partition}" },
- { "Could not find route for message" })
-
private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
when (consumedMessage) {
is SuccessfullyConsumedMessage ->
@@ -119,6 +106,11 @@ internal class VesHvCollector(
}
}
+ private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
+ metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+ logger.handleReactiveStreamError(clientContext, it)
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 10fe0a51..20b11753 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
- KafkaSinkProvider(config)
+ fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 1f5df371..185693c0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Sequence<KafkaSink>> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
.doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
- .map(::createCollectorConfiguration)
+ .map(::createRoutingDescription)
.onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
.retryWhen(retry)
-
- private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
- try {
- DataStreams.namedSinks(configuration)
- .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
- .map(streamParser::unsafeParse)
- .asSequence()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
+ private fun createRoutingDescription(configuration: JsonObject): Routing = try {
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
companion object {
private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 5052cc5c..2ce0f42f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
- private val ctx: ClientContext) : Sink {
+internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+ private val ctx: ClientContext) : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
- messages.map(::vesToKafkaRecord).let { records ->
- sender.send(records).map {
- val msg = it.correlationMetadata()
- if (it.exception() == null) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ messages.map(::vesToKafkaRecord)
+ .compose { sender.send(it) }
+ .map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
+ } else {
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- SuccessfullyConsumedMessage(msg)
- } else {
- logger.warn(ctx::fullMdc, Marker.Invoke()) {
- "Failed to send message to Kafka. Reason: ${it.exception().message}"
- }
- logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
- FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- }
- }
private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
SenderRecord.create(
- routed.topic,
- routed.partition,
+ routed.targetTopic,
+ routed.partition.orNull(),
FILL_TIMESTAMP_LATER,
routed.message.header,
routed.message,
routed)
- internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
-
companion object {
private val FILL_TIMESTAMP_LATER: Long? = null
- private val logger = Logger(KafkaSink::class)
+ private val logger = Logger(KafkaPublisher::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 96e45a02..7a498652 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,66 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.effects.IO
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.lang.Integer.max
+import java.util.Collections.synchronizedMap
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider internal constructor(
- private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
-
- constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
-
- override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+internal class KafkaSinkProvider : SinkProvider {
+ private val messageSinks = synchronizedMap(
+ mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
+ )
+
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ messageSinks.computeIfAbsent(stream, ::createKafkaSender).let {
+ KafkaPublisher(it, ctx)
+ }
+ }
override fun close() = IO {
- kafkaSender.close()
- logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ messageSinks.values.forEach { it.close() }
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
}
companion object {
private val logger = Logger(KafkaSinkProvider::class)
- private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
- private const val BUFFER_MEMORY_MULTIPLIER = 32
- private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
-
- private fun constructKafkaSender(config: CollectorConfiguration) =
- KafkaSender.create(constructSenderOptions(config))
-
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
- .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
- .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
- .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
- .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
- .producerProperty(RETRIES_CONFIG, 1)
- .producerProperty(ACKS_CONFIG, "1")
- .stopOnError(false)
-
- private fun maxRequestSize(maxRequestSizeBytes: Int) =
- (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
-
- private fun bufferMemory(maxRequestSizeBytes: Int) =
- max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
new file mode 100644
index 00000000..40de8c51
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+
+private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+private const val BUFFER_MEMORY_MULTIPLIER = 32
+private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
+
+internal fun createKafkaSender(sinkStream: SinkStream) =
+ (sinkStream as KafkaSink).let { kafkaSink ->
+ KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers())
+ .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink))
+ .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink))
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
+ .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+ .stopOnError(false)
+ )
+ }
+
+private fun maxRequestSize(kafkaSink: KafkaSink) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt()
+
+private fun bufferMemory(kafkaSink: KafkaSink) =
+ Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes())
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index b8b55865..6b9c6803 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -20,22 +20,30 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
-import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
/**
@@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
* @since May 2018
*/
object RouterTest : Spek({
- given("sample configuration") {
- val config = Routing(listOf(
- Route(PERF3GPP.domainName, "ves_rtpm", { 2 }),
- Route(SYSLOG.domainName, "ves_trace")
- ))
- val cut = Router(config, ClientContext())
-
- on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
-
- it("should have route available") {
- assertThat(result).isNotNull()
- }
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
- }
+ describe("Router") {
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
- }
+ whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+ whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
- }
- }
+ val messageSinkMap = mapOf(
+ Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+ Pair(syslogTopic, lazyOf(messageSinkMock))
+ )
- on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ given("sample routing specification") {
+ val cut = router(defaultRouting, messageSinkMap)
- it("should have route available") {
- assertThat(result).isNotNull()
- }
+ on("message with existing route (rtpm)") {
+ whenever(messageSinkMock.send(routedPerf3GppMessage))
+ .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
- }
+ it("should be properly routed") {
+ val result = cut.route(perf3gppMessage)
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
+ assertThat(result).isNotNull()
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedPerf3gppMessage)
+ .verifyComplete()
+
+ verify(perf3gppSinkMock).topicName()
+ verify(messageSinkMock).send(routedPerf3GppMessage)
+ }
}
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ on("message with existing route (syslog)") {
+ whenever(messageSinkMock.send(routedSyslogMessage))
+ .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
+ val result = cut.route(syslogMessage)
+
+ it("should be properly routed") {
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedSyslogMessage)
+ .verifyComplete()
+
+ verify(syslogSinkMock).topicName()
+ verify(messageSinkMock).send(routedSyslogMessage)
+ }
}
- }
- on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ on("message with unknown route") {
+ val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+ val result = cut.route(message)
- it("should not have route available") {
- assertThat(result).isEqualTo(None)
+ it("should not have route available") {
+ StepVerifier.create(result).verifyComplete()
+ }
}
}
}
-}) \ No newline at end of file
+
+})
+
+private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
+ Router(routing, kafkaPublisherMap, ClientContext(), mock())
+
+private val perf3gppTopic = "PERF_PERF"
+private val perf3gppSinkMock = mock<KafkaSink>()
+private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+
+private val syslogTopic = "SYS_LOG"
+private val syslogSinkMock = mock<KafkaSink>()
+private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
+
+private val messageSinkMock = mock<Sink>()
+private val default_partition = None
+
+private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
+private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
+
+private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
+private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
index 571a6680..8616ce03 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
@@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import reactor.core.publisher.Flux
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({
.expectNoEvent(waitTime)
}
}
-
}
+
given("valid configuration from cbs") {
val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
@@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val receivedSink1 = it.elementAt(0)
- val receivedSink2 = it.elementAt(1)
+ val route1 = it.elementAt(0)
+ val route2 = it.elementAt(1)
+ val receivedSink1 = route1.sink
+ val receivedSink2 = route2.sink
+ assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
assertThat(receivedSink1.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+ assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
assertThat(receivedSink2.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
+
}.verifyComplete()
}
}
@@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({
})
+
+val PERF3GPP_REGIONAL = "perf3gpp_regional"
+val PERF3GPP_CENTRAL = "perf3gpp_central"
+
private val aafCredentials1 = ImmutableAafCredentials.builder()
.username("client")
.password("very secure password")
@@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
private val validConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
@@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse("""
"topic_name": "REG_HVVES_PERF3GPP"
}
},
- "perf3gpp_central": {
+ "$PERF3GPP_CENTRAL": {
"type": "kafka",
"aaf_credentials": {
"username": "other_client",
@@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse("""
private val invalidConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
deleted file mode 100644
index eb0a3173..00000000
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import arrow.syntax.collections.tail
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.verify
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.ves.VesEventOuterClass
-import reactor.kafka.sender.KafkaSender
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-internal object KafkaSinkProviderTest : Spek({
- describe("non functional requirements") {
- given("sample configuration") {
- val config = CollectorConfiguration(
- maxRequestSizeBytes = 1024 * 1024,
- kafkaServers = "localhost:9090",
- routing = Routing(emptyList())
- )
-
- val cut = KafkaSinkProvider(config)
-
- on("sample clients") {
- val clients = listOf(
- ClientContext(),
- ClientContext(),
- ClientContext(),
- ClientContext())
-
- it("should create only one instance of KafkaSender") {
- val sinks = clients.map(cut::invoke)
- val firstSink = sinks[0]
- val restOfSinks = sinks.tail()
-
- assertThat(restOfSinks).isNotEmpty
- assertThat(restOfSinks).allSatisfy { sink ->
- assertThat(firstSink.usesSameSenderAs(sink))
- .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
- .isTrue()
- }
- }
- }
- }
-
- given("dummy KafkaSender") {
- val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
- val cut = KafkaSinkProvider(kafkaSender)
-
- on("close") {
- cut.close().unsafeRunSync()
-
- it("should close KafkaSender") {
- verify(kafkaSender).close()
- }
- }
- }
- }
-})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index a6b32ed9..92719e94 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -107,8 +107,8 @@ object MetricsSpecification : Spek({
assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
.describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
.isEqualTo(2)
- assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
- .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
+ assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
+ .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
.isEqualTo(1)
}
}
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 50fe098c..61a9a356 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
logger.info { "Processed $runs connections each containing $numMessages msgs." }
- logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
+ logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
@@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({
})
-private const val ONE_MILION = 1_000_000.0
+private const val ONE_MILLION = 1_000_000.0
private val rand = Random()
private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 109915a1..ec540606 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) : AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : Closeable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider)
+ healthStateProvider
+ )
+
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
@@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
throw IllegalStateException("Collector not available.")
}
- override fun close() {
- collectorProvider.close().unsafeRunSync()
+
+ fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ return sink.sentMessages
+ }
+
+ fun handleConnection(vararg packets: ByteBuf) {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
+ override fun close() = collectorProvider.close()
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
-
class DummySinkProvider(private val sink: Sink) : SinkProvider {
- private val active = AtomicBoolean(true)
+ private val sinkInitialized = AtomicBoolean(false)
- override fun invoke(ctx: ClientContext) = sink
-
- override fun close() = IO {
- active.set(false)
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ sinkInitialized.set(true)
+ sink
}
- val closed get() = !active.get()
-
+ override fun close() =
+ if (sinkInitialized.get()) {
+ sink.close()
+ } else {
+ IO.unit
+ }
}
private val timeout = Duration.ofSeconds(10)
-fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
- return sink.sentMessages
-}
-
-fun Sut.handleConnection(vararg packets: ByteBuf) {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
-}
-
-fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 21c5c189..5d215fc5 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.None
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -65,12 +65,28 @@ object VesHvSpecification : Spek({
.hasSize(2)
}
+ it("should create sink lazily") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ // just connecting should not create sink
+ sut.handleConnection()
+ sut.close().unsafeRunSync()
+
+ // then
+ assertThat(sink.closed).isFalse()
+ }
+
it("should close sink when closing collector provider") {
- val (sut, _) = vesHvWithStoringSink()
+ val (sut, sink) = vesHvWithStoringSink()
+ // given Sink initialized
+ // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
- sut.close()
+ // when
+ sut.close().unsafeRunSync()
- assertThat(sut.sinkProvider.closed).isTrue()
+ // then
+ assertThat(sink.closed).isTrue()
}
}
@@ -145,14 +161,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
}
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -161,14 +177,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages[0].topic).describedAs("first message topic")
+ assertThat(messages[0].targetTopic).describedAs("first message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[1].topic).describedAs("second message topic")
+ assertThat(messages[1].targetTopic).describedAs("second message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[2].topic).describedAs("last message topic")
- .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ assertThat(messages[2].targetTopic).describedAs("last message topic")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
}
it("should drop message if route was not found") {
@@ -181,7 +197,7 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
@@ -205,7 +221,7 @@ object VesHvSpecification : Spek({
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,21 +229,21 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should change domain routing") {
@@ -236,22 +252,22 @@ object VesHvSpecification : Spek({
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should update routing for each client sending one message") {
@@ -261,7 +277,7 @@ object VesHvSpecification : Spek({
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -269,8 +285,8 @@ object VesHvSpecification : Spek({
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
@@ -287,7 +303,7 @@ object VesHvSpecification : Spek({
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
println("config changed")
}
}
@@ -297,8 +313,8 @@ object VesHvSpecification : Spek({
sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
@@ -320,7 +336,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +365,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index a398967d..c465fd91 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,67 +20,21 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
-const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
-
-val configWithBasicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithTwoDomainsToOneTopicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(HEARTBEAT.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(MEASUREMENT.domainName)
- .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithDifferentRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(ALTERNATE_PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithEmptyRouting = emptySequence<KafkaSink>()
-
-
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
- fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
+ fun updateConfiguration(routing: Routing) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(kafkaSinkSequence)
+ configStream.onNext(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index b599a076..a450b794 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@ class FakeMetrics : Metrics {
override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
- messagesSentToTopic.compute(msg.topic) { k, _ ->
+ messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
messagesSentToTopic[k]?.inc() ?: 1
}
lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
new file mode 100644
index 00000000..e9914ef1
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
+const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
+const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
+ .name("PERF3GPP")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+private val alternativeKafkaSink = ImmutableKafkaSink.builder()
+ .name("ALTERNATE")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+
+
+val basicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink)
+)
+
+val alternativeRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink)
+)
+
+val twoDomainsToOneTopicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink)
+)
+
+val emptyRouting: Routing = emptyList()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 51f724e0..160defdb 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
@@ -30,6 +31,7 @@ import reactor.core.publisher.Flux
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
@@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong
*/
class StoringSink : Sink {
private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+ private val active = AtomicBoolean(true)
+ val closed get() = !active.get()
val sentMessages: List<RoutedMessage>
get() = sent.toList()
@@ -45,6 +49,13 @@ class StoringSink : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
+
+ /*
+ * TOD0: if the code would look like:
+ * ```IO { active.set(false) }```
+ * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
+ */
+ override fun close() = active.set(false).run { IO.unit }
}
/**
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
index e4d147b1..04f9be63 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,4 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.domain
-data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
+import arrow.core.Option
+
+
+data class RoutedMessage(val message: VesMessage,
+ val targetTopic: String,
+ val partition: Option<Int>)
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 2fb44768..c04c2c95 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
sentMessages.increment()
- sentMessagesByTopic(msg.topic).increment()
+ sentMessagesByTopic(msg.targetTopic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index d15dccef..aed4d928 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -59,9 +60,10 @@ object VesServer {
private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
CollectorFactory(
AdapterFactory.configurationProvider(config.cbs),
- AdapterFactory.sinkCreatorFactory(config.collector),
+ AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes
+ config.server.maxPayloadSizeBytes,
+ HealthState.INSTANCE
)
private fun logServerStarted(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e452a5f4..f260f158 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.main
+import arrow.core.Option
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
@@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.ves.VesEventOuterClass
import java.time.Instant
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
@@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({
})
fun routedMessage(topic: String, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition) }
fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
- vesEvent().let { evt ->
- val builder = evt.toBuilder()
+ vesEvent().run {
+ val builder = toBuilder()
builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
- builder.build()
- }.let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- } \ No newline at end of file
+ builder.build().toRoutedMessage(topic, partition)
+ }
+
+private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
+ partition: Int,
+ receivedAt: Temporal = Instant.now()) =
+ RoutedMessage(
+ VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
+ topic,
+ Option.just(partition)
+ )
+