aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt6
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt5
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt74
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt)45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt60
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt56
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt123
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt)22
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt88
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt22
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt12
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt63
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt86
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt54
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt60
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt13
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt9
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt33
27 files changed, 504 insertions, 445 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index 5ffa39df..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-data class Routing(val routes: List<Route>)
+data class Route(val domain: String, val sink: KafkaSink)
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
+typealias Routing = List<Route>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index c8a156c5..04bba7e2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Either
import arrow.core.None
import arrow.core.Option
-import arrow.core.Some
import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
@@ -68,7 +65,7 @@ internal class ConfigurationValidator {
// collectorConfiguration
CollectorConfiguration(-1,
"I do not exist. I'm not even a URL :o",
- Routing(emptyList())),
+ emptyList()),
// end TOD0
logLevel
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 37192868..4b89488b 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -185,5 +185,5 @@ internal object ConfigurationValidatorTest : Spek({
}
})
-val emptyRouting = Routing(emptyList())
+val emptyRouting: Routing = emptyList()
val someFromEmptyRouting = Some(emptyRouting)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 0a1e2d43..1b92d90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
-interface Sink {
+interface Sink : Closeable {
+ fun send(message: RoutedMessage) = send(Flux.just(message))
+
fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
+interface SinkProvider : Closeable {
+ operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
+}
+
+typealias ConfigurationProvider = () -> Flux<Routing>
+
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
@@ -42,11 +51,3 @@ interface Metrics {
fun notifyClientConnected()
fun notifyClientRejected(cause: ClientRejectionCause)
}
-
-interface SinkProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Sink
-}
-
-interface ConfigurationProvider {
- operator fun invoke(): Flux<Sequence<KafkaSink>>
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index fa4f9670..2b29acd9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import java.util.concurrent.atomic.AtomicReference
/**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Sequence<KafkaSink>>()
+ val config = AtomicReference<Routing>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(kafkaSinks, ctx),
- sink = sinkProvider(ctx),
+ router = Router(routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
}
}
-
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index a95a44d5..6e2e20f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,39 +19,75 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.None
import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
-class Router(private val routing: Routing, private val ctx: ClientContext) {
-
- constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
- Routing(
- kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
- ),
- ctx)
+class Router internal constructor(private val routing: Routing,
+ private val messageSinks: Map<String, Lazy<Sink>>,
+ private val ctx: ClientContext,
+ private val metrics: Metrics) {
+ constructor(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext,
+ metrics: Metrics) :
+ this(routing,
+ constructMessageSinks(routing, sinkProvider, ctx),
+ ctx,
+ metrics) {
+ logger.debug(ctx::mdc) { "Routing for client: $routing" }
+ logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" }
+ }
- fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
- .map { it.routeMessage(message) }
+ .fold({
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
+ Flux.empty<Route>()
+ }, {
+ logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
+ Flux.just(it)
+ })
+ .flatMap {
+ val sinkTopic = it.sink.topicName()
+ messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
+ }
- private fun Route.routeMessage(message: VesMessage) =
- RoutedMessage(targetTopic, partitioning(message.header), message)
+ private fun routeFor(header: CommonEventHeader) =
+ routing.find { it.domain == header.domain }.toOption()
- private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- routing.routes.find { it.domain == commonHeader.domain }.toOption()
+ private fun messageSinkFor(sinkTopic: String) = messageSinks
+ .getOrElse(sinkTopic) {
+ throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
+ }
companion object {
- private val logger = Logger(Routing::class)
+ private val logger = Logger(Router::class)
+ private val NONE_PARTITION = None
+
+ internal fun constructMessageSinks(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext) =
+ routing.map(Route::sink)
+ .distinctBy { it.topicName() }
+ .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
}
+
+ private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
}
+
+internal class MissingMessageSinkException(msg: String) : Throwable(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 6a2792c3..433e4d57 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
-import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -53,7 +49,6 @@ internal class VesHvCollector(
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
- private val sink: Sink,
private val metrics: Metrics) : Collector {
override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
@@ -62,10 +57,10 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume {
- metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
- logger.handleReactiveStreamError(clientContext, it) }
+ // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
+ .handleErrors()
+ .transform(::route)
+ .handleErrors()
.doFinally { releaseBuffersMemory() }
.then()
@@ -98,18 +93,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
- .flatMap(this::findRoute)
- .compose(sink::send)
+ private fun route(flux: Flux<VesMessage>) = flux
+ .flatMap(router::route)
.doOnNext(this::updateSinkMetrics)
- private fun findRoute(msg: VesMessage) = router
- .findDestination(msg)
- .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
- .filterEmptyWithLog(logger, clientContext::fullMdc,
- { "Found route for message: ${it.topic}, partition: ${it.partition}" },
- { "Could not find route for message" })
-
private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
when (consumedMessage) {
is SuccessfullyConsumedMessage ->
@@ -119,6 +106,11 @@ internal class VesHvCollector(
}
}
+ private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
+ metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+ logger.handleReactiveStreamError(clientContext, it)
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 10fe0a51..20b11753 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
- KafkaSinkProvider(config)
+ fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 1f5df371..185693c0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Sequence<KafkaSink>> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
.doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
- .map(::createCollectorConfiguration)
+ .map(::createRoutingDescription)
.onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
.retryWhen(retry)
-
- private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
- try {
- DataStreams.namedSinks(configuration)
- .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
- .map(streamParser::unsafeParse)
- .asSequence()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
+ private fun createRoutingDescription(configuration: JsonObject): Routing = try {
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
companion object {
private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 5052cc5c..2ce0f42f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
- private val ctx: ClientContext) : Sink {
+internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+ private val ctx: ClientContext) : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
- messages.map(::vesToKafkaRecord).let { records ->
- sender.send(records).map {
- val msg = it.correlationMetadata()
- if (it.exception() == null) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ messages.map(::vesToKafkaRecord)
+ .compose { sender.send(it) }
+ .map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
+ } else {
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- SuccessfullyConsumedMessage(msg)
- } else {
- logger.warn(ctx::fullMdc, Marker.Invoke()) {
- "Failed to send message to Kafka. Reason: ${it.exception().message}"
- }
- logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
- FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- }
- }
private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
SenderRecord.create(
- routed.topic,
- routed.partition,
+ routed.targetTopic,
+ routed.partition.orNull(),
FILL_TIMESTAMP_LATER,
routed.message.header,
routed.message,
routed)
- internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
-
companion object {
private val FILL_TIMESTAMP_LATER: Long? = null
- private val logger = Logger(KafkaSink::class)
+ private val logger = Logger(KafkaPublisher::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 96e45a02..7a498652 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,66 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.effects.IO
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.lang.Integer.max
+import java.util.Collections.synchronizedMap
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider internal constructor(
- private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
-
- constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
-
- override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+internal class KafkaSinkProvider : SinkProvider {
+ private val messageSinks = synchronizedMap(
+ mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
+ )
+
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ messageSinks.computeIfAbsent(stream, ::createKafkaSender).let {
+ KafkaPublisher(it, ctx)
+ }
+ }
override fun close() = IO {
- kafkaSender.close()
- logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ messageSinks.values.forEach { it.close() }
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
}
companion object {
private val logger = Logger(KafkaSinkProvider::class)
- private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
- private const val BUFFER_MEMORY_MULTIPLIER = 32
- private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
-
- private fun constructKafkaSender(config: CollectorConfiguration) =
- KafkaSender.create(constructSenderOptions(config))
-
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
- .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
- .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
- .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
- .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
- .producerProperty(RETRIES_CONFIG, 1)
- .producerProperty(ACKS_CONFIG, "1")
- .stopOnError(false)
-
- private fun maxRequestSize(maxRequestSizeBytes: Int) =
- (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
-
- private fun bufferMemory(maxRequestSizeBytes: Int) =
- max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
new file mode 100644
index 00000000..40de8c51
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+
+private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+private const val BUFFER_MEMORY_MULTIPLIER = 32
+private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
+
+internal fun createKafkaSender(sinkStream: SinkStream) =
+ (sinkStream as KafkaSink).let { kafkaSink ->
+ KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers())
+ .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink))
+ .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink))
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
+ .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+ .stopOnError(false)
+ )
+ }
+
+private fun maxRequestSize(kafkaSink: KafkaSink) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt()
+
+private fun bufferMemory(kafkaSink: KafkaSink) =
+ Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes())
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index b8b55865..6b9c6803 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -20,22 +20,30 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
-import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
/**
@@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
* @since May 2018
*/
object RouterTest : Spek({
- given("sample configuration") {
- val config = Routing(listOf(
- Route(PERF3GPP.domainName, "ves_rtpm", { 2 }),
- Route(SYSLOG.domainName, "ves_trace")
- ))
- val cut = Router(config, ClientContext())
-
- on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
-
- it("should have route available") {
- assertThat(result).isNotNull()
- }
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
- }
+ describe("Router") {
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
- }
+ whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+ whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
- }
- }
+ val messageSinkMap = mapOf(
+ Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+ Pair(syslogTopic, lazyOf(messageSinkMock))
+ )
- on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ given("sample routing specification") {
+ val cut = router(defaultRouting, messageSinkMap)
- it("should have route available") {
- assertThat(result).isNotNull()
- }
+ on("message with existing route (rtpm)") {
+ whenever(messageSinkMock.send(routedPerf3GppMessage))
+ .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
- }
+ it("should be properly routed") {
+ val result = cut.route(perf3gppMessage)
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
+ assertThat(result).isNotNull()
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedPerf3gppMessage)
+ .verifyComplete()
+
+ verify(perf3gppSinkMock).topicName()
+ verify(messageSinkMock).send(routedPerf3GppMessage)
+ }
}
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ on("message with existing route (syslog)") {
+ whenever(messageSinkMock.send(routedSyslogMessage))
+ .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
+ val result = cut.route(syslogMessage)
+
+ it("should be properly routed") {
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedSyslogMessage)
+ .verifyComplete()
+
+ verify(syslogSinkMock).topicName()
+ verify(messageSinkMock).send(routedSyslogMessage)
+ }
}
- }
- on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ on("message with unknown route") {
+ val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+ val result = cut.route(message)
- it("should not have route available") {
- assertThat(result).isEqualTo(None)
+ it("should not have route available") {
+ StepVerifier.create(result).verifyComplete()
+ }
}
}
}
-}) \ No newline at end of file
+
+})
+
+private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
+ Router(routing, kafkaPublisherMap, ClientContext(), mock())
+
+private val perf3gppTopic = "PERF_PERF"
+private val perf3gppSinkMock = mock<KafkaSink>()
+private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+
+private val syslogTopic = "SYS_LOG"
+private val syslogSinkMock = mock<KafkaSink>()
+private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
+
+private val messageSinkMock = mock<Sink>()
+private val default_partition = None
+
+private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
+private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
+
+private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
+private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
index 571a6680..8616ce03 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
@@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import reactor.core.publisher.Flux
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({
.expectNoEvent(waitTime)
}
}
-
}
+
given("valid configuration from cbs") {
val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
@@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val receivedSink1 = it.elementAt(0)
- val receivedSink2 = it.elementAt(1)
+ val route1 = it.elementAt(0)
+ val route2 = it.elementAt(1)
+ val receivedSink1 = route1.sink
+ val receivedSink2 = route2.sink
+ assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
assertThat(receivedSink1.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+ assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
assertThat(receivedSink2.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
+
}.verifyComplete()
}
}
@@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({
})
+
+val PERF3GPP_REGIONAL = "perf3gpp_regional"
+val PERF3GPP_CENTRAL = "perf3gpp_central"
+
private val aafCredentials1 = ImmutableAafCredentials.builder()
.username("client")
.password("very secure password")
@@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
private val validConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
@@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse("""
"topic_name": "REG_HVVES_PERF3GPP"
}
},
- "perf3gpp_central": {
+ "$PERF3GPP_CENTRAL": {
"type": "kafka",
"aaf_credentials": {
"username": "other_client",
@@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse("""
private val invalidConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
deleted file mode 100644
index eb0a3173..00000000
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import arrow.syntax.collections.tail
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.verify
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.ves.VesEventOuterClass
-import reactor.kafka.sender.KafkaSender
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-internal object KafkaSinkProviderTest : Spek({
- describe("non functional requirements") {
- given("sample configuration") {
- val config = CollectorConfiguration(
- maxRequestSizeBytes = 1024 * 1024,
- kafkaServers = "localhost:9090",
- routing = Routing(emptyList())
- )
-
- val cut = KafkaSinkProvider(config)
-
- on("sample clients") {
- val clients = listOf(
- ClientContext(),
- ClientContext(),
- ClientContext(),
- ClientContext())
-
- it("should create only one instance of KafkaSender") {
- val sinks = clients.map(cut::invoke)
- val firstSink = sinks[0]
- val restOfSinks = sinks.tail()
-
- assertThat(restOfSinks).isNotEmpty
- assertThat(restOfSinks).allSatisfy { sink ->
- assertThat(firstSink.usesSameSenderAs(sink))
- .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
- .isTrue()
- }
- }
- }
- }
-
- given("dummy KafkaSender") {
- val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
- val cut = KafkaSinkProvider(kafkaSender)
-
- on("close") {
- cut.close().unsafeRunSync()
-
- it("should close KafkaSender") {
- verify(kafkaSender).close()
- }
- }
- }
- }
-})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index a6b32ed9..92719e94 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -107,8 +107,8 @@ object MetricsSpecification : Spek({
assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
.describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
.isEqualTo(2)
- assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
- .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
+ assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
+ .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
.isEqualTo(1)
}
}
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 50fe098c..61a9a356 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
logger.info { "Processed $runs connections each containing $numMessages msgs." }
- logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
+ logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
@@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({
})
-private const val ONE_MILION = 1_000_000.0
+private const val ONE_MILLION = 1_000_000.0
private val rand = Random()
private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 109915a1..ec540606 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) : AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : Closeable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider)
+ healthStateProvider
+ )
+
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
@@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
throw IllegalStateException("Collector not available.")
}
- override fun close() {
- collectorProvider.close().unsafeRunSync()
+
+ fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ return sink.sentMessages
+ }
+
+ fun handleConnection(vararg packets: ByteBuf) {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
+ override fun close() = collectorProvider.close()
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
-
class DummySinkProvider(private val sink: Sink) : SinkProvider {
- private val active = AtomicBoolean(true)
+ private val sinkInitialized = AtomicBoolean(false)
- override fun invoke(ctx: ClientContext) = sink
-
- override fun close() = IO {
- active.set(false)
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ sinkInitialized.set(true)
+ sink
}
- val closed get() = !active.get()
-
+ override fun close() =
+ if (sinkInitialized.get()) {
+ sink.close()
+ } else {
+ IO.unit
+ }
}
private val timeout = Duration.ofSeconds(10)
-fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
- return sink.sentMessages
-}
-
-fun Sut.handleConnection(vararg packets: ByteBuf) {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
-}
-
-fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 21c5c189..5d215fc5 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.None
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -65,12 +65,28 @@ object VesHvSpecification : Spek({
.hasSize(2)
}
+ it("should create sink lazily") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ // just connecting should not create sink
+ sut.handleConnection()
+ sut.close().unsafeRunSync()
+
+ // then
+ assertThat(sink.closed).isFalse()
+ }
+
it("should close sink when closing collector provider") {
- val (sut, _) = vesHvWithStoringSink()
+ val (sut, sink) = vesHvWithStoringSink()
+ // given Sink initialized
+ // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
- sut.close()
+ // when
+ sut.close().unsafeRunSync()
- assertThat(sut.sinkProvider.closed).isTrue()
+ // then
+ assertThat(sink.closed).isTrue()
}
}
@@ -145,14 +161,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
}
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -161,14 +177,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages[0].topic).describedAs("first message topic")
+ assertThat(messages[0].targetTopic).describedAs("first message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[1].topic).describedAs("second message topic")
+ assertThat(messages[1].targetTopic).describedAs("second message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[2].topic).describedAs("last message topic")
- .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ assertThat(messages[2].targetTopic).describedAs("last message topic")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
}
it("should drop message if route was not found") {
@@ -181,7 +197,7 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
@@ -205,7 +221,7 @@ object VesHvSpecification : Spek({
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,21 +229,21 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should change domain routing") {
@@ -236,22 +252,22 @@ object VesHvSpecification : Spek({
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should update routing for each client sending one message") {
@@ -261,7 +277,7 @@ object VesHvSpecification : Spek({
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -269,8 +285,8 @@ object VesHvSpecification : Spek({
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
@@ -287,7 +303,7 @@ object VesHvSpecification : Spek({
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
println("config changed")
}
}
@@ -297,8 +313,8 @@ object VesHvSpecification : Spek({
sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
@@ -320,7 +336,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +365,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index a398967d..c465fd91 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,67 +20,21 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
-const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
-
-val configWithBasicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithTwoDomainsToOneTopicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(HEARTBEAT.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(MEASUREMENT.domainName)
- .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithDifferentRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(ALTERNATE_PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithEmptyRouting = emptySequence<KafkaSink>()
-
-
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
- fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
+ fun updateConfiguration(routing: Routing) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(kafkaSinkSequence)
+ configStream.onNext(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index b599a076..a450b794 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@ class FakeMetrics : Metrics {
override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
- messagesSentToTopic.compute(msg.topic) { k, _ ->
+ messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
messagesSentToTopic[k]?.inc() ?: 1
}
lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
new file mode 100644
index 00000000..e9914ef1
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
+const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
+const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
+ .name("PERF3GPP")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+private val alternativeKafkaSink = ImmutableKafkaSink.builder()
+ .name("ALTERNATE")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+
+
+val basicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink)
+)
+
+val alternativeRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink)
+)
+
+val twoDomainsToOneTopicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink)
+)
+
+val emptyRouting: Routing = emptyList()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 51f724e0..160defdb 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
@@ -30,6 +31,7 @@ import reactor.core.publisher.Flux
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
@@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong
*/
class StoringSink : Sink {
private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+ private val active = AtomicBoolean(true)
+ val closed get() = !active.get()
val sentMessages: List<RoutedMessage>
get() = sent.toList()
@@ -45,6 +49,13 @@ class StoringSink : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
+
+ /*
+ * TOD0: if the code would look like:
+ * ```IO { active.set(false) }```
+ * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
+ */
+ override fun close() = active.set(false).run { IO.unit }
}
/**
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
index e4d147b1..04f9be63 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,4 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.domain
-data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
+import arrow.core.Option
+
+
+data class RoutedMessage(val message: VesMessage,
+ val targetTopic: String,
+ val partition: Option<Int>)
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 2fb44768..c04c2c95 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
sentMessages.increment()
- sentMessagesByTopic(msg.topic).increment()
+ sentMessagesByTopic(msg.targetTopic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index d15dccef..aed4d928 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -59,9 +60,10 @@ object VesServer {
private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
CollectorFactory(
AdapterFactory.configurationProvider(config.cbs),
- AdapterFactory.sinkCreatorFactory(config.collector),
+ AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes
+ config.server.maxPayloadSizeBytes,
+ HealthState.INSTANCE
)
private fun logServerStarted(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e452a5f4..f260f158 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.main
+import arrow.core.Option
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
@@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.ves.VesEventOuterClass
import java.time.Instant
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
@@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({
})
fun routedMessage(topic: String, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition) }
fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
- vesEvent().let { evt ->
- val builder = evt.toBuilder()
+ vesEvent().run {
+ val builder = toBuilder()
builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
- builder.build()
- }.let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- } \ No newline at end of file
+ builder.build().toRoutedMessage(topic, partition)
+ }
+
+private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
+ partition: Int,
+ receivedAt: Temporal = Instant.now()) =
+ RoutedMessage(
+ VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
+ topic,
+ Option.just(partition)
+ )
+