aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 14:21:02 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-28 14:16:02 +0100
commit2174a045086e16611128b20a6d4357c04d9eac4a (patch)
tree6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-core/src/main
parent1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff)
Redefine Routing
As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt74
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt)45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt60
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt56
9 files changed, 195 insertions, 142 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 0a1e2d43..1b92d90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
-interface Sink {
+interface Sink : Closeable {
+ fun send(message: RoutedMessage) = send(Flux.just(message))
+
fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
+interface SinkProvider : Closeable {
+ operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
+}
+
+typealias ConfigurationProvider = () -> Flux<Routing>
+
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
@@ -42,11 +51,3 @@ interface Metrics {
fun notifyClientConnected()
fun notifyClientRejected(cause: ClientRejectionCause)
}
-
-interface SinkProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Sink
-}
-
-interface ConfigurationProvider {
- operator fun invoke(): Flux<Sequence<KafkaSink>>
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index fa4f9670..2b29acd9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import java.util.concurrent.atomic.AtomicReference
/**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Sequence<KafkaSink>>()
+ val config = AtomicReference<Routing>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(kafkaSinks, ctx),
- sink = sinkProvider(ctx),
+ router = Router(routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
}
}
-
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index a95a44d5..6e2e20f7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,39 +19,75 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.None
import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
-class Router(private val routing: Routing, private val ctx: ClientContext) {
-
- constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
- Routing(
- kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
- ),
- ctx)
+class Router internal constructor(private val routing: Routing,
+ private val messageSinks: Map<String, Lazy<Sink>>,
+ private val ctx: ClientContext,
+ private val metrics: Metrics) {
+ constructor(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext,
+ metrics: Metrics) :
+ this(routing,
+ constructMessageSinks(routing, sinkProvider, ctx),
+ ctx,
+ metrics) {
+ logger.debug(ctx::mdc) { "Routing for client: $routing" }
+ logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" }
+ }
- fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
- .map { it.routeMessage(message) }
+ .fold({
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
+ Flux.empty<Route>()
+ }, {
+ logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
+ Flux.just(it)
+ })
+ .flatMap {
+ val sinkTopic = it.sink.topicName()
+ messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
+ }
- private fun Route.routeMessage(message: VesMessage) =
- RoutedMessage(targetTopic, partitioning(message.header), message)
+ private fun routeFor(header: CommonEventHeader) =
+ routing.find { it.domain == header.domain }.toOption()
- private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- routing.routes.find { it.domain == commonHeader.domain }.toOption()
+ private fun messageSinkFor(sinkTopic: String) = messageSinks
+ .getOrElse(sinkTopic) {
+ throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
+ }
companion object {
- private val logger = Logger(Routing::class)
+ private val logger = Logger(Router::class)
+ private val NONE_PARTITION = None
+
+ internal fun constructMessageSinks(routing: Routing,
+ sinkProvider: SinkProvider,
+ ctx: ClientContext) =
+ routing.map(Route::sink)
+ .distinctBy { it.topicName() }
+ .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
}
+
+ private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
}
+
+internal class MissingMessageSinkException(msg: String) : Throwable(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 6a2792c3..433e4d57 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
-import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -53,7 +49,6 @@ internal class VesHvCollector(
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val router: Router,
- private val sink: Sink,
private val metrics: Metrics) : Collector {
override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
@@ -62,10 +57,10 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- .transform(::routeMessage)
- .onErrorResume {
- metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
- logger.handleReactiveStreamError(clientContext, it) }
+ // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
+ .handleErrors()
+ .transform(::route)
+ .handleErrors()
.doFinally { releaseBuffersMemory() }
.then()
@@ -98,18 +93,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
- .flatMap(this::findRoute)
- .compose(sink::send)
+ private fun route(flux: Flux<VesMessage>) = flux
+ .flatMap(router::route)
.doOnNext(this::updateSinkMetrics)
- private fun findRoute(msg: VesMessage) = router
- .findDestination(msg)
- .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
- .filterEmptyWithLog(logger, clientContext::fullMdc,
- { "Found route for message: ${it.topic}, partition: ${it.partition}" },
- { "Could not find route for message" })
-
private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
when (consumedMessage) {
is SuccessfullyConsumedMessage ->
@@ -119,6 +106,11 @@ internal class VesHvCollector(
}
}
+ private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
+ metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+ logger.handleReactiveStreamError(clientContext, it)
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 10fe0a51..20b11753 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
- KafkaSinkProvider(config)
+ fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 1f5df371..185693c0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Sequence<KafkaSink>> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient) = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
.doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
- .map(::createCollectorConfiguration)
+ .map(::createRoutingDescription)
.onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
.retryWhen(retry)
-
- private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
- try {
- DataStreams.namedSinks(configuration)
- .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
- .map(streamParser::unsafeParse)
- .asSequence()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
+ private fun createRoutingDescription(configuration: JsonObject): Routing = try {
+ DataStreams.namedSinks(configuration)
+ .filter(streamOfType(KAFKA))
+ .map(streamParser::unsafeParse)
+ .map { Route(it.name(), it) }
+ .asIterable()
+ .toList()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
companion object {
private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 5052cc5c..2ce0f42f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
- private val ctx: ClientContext) : Sink {
+internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+ private val ctx: ClientContext) : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
- messages.map(::vesToKafkaRecord).let { records ->
- sender.send(records).map {
- val msg = it.correlationMetadata()
- if (it.exception() == null) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ messages.map(::vesToKafkaRecord)
+ .compose { sender.send(it) }
+ .map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
+ } else {
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- SuccessfullyConsumedMessage(msg)
- } else {
- logger.warn(ctx::fullMdc, Marker.Invoke()) {
- "Failed to send message to Kafka. Reason: ${it.exception().message}"
- }
- logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
- FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
- }
- }
private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
SenderRecord.create(
- routed.topic,
- routed.partition,
+ routed.targetTopic,
+ routed.partition.orNull(),
FILL_TIMESTAMP_LATER,
routed.message.header,
routed.message,
routed)
- internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
-
companion object {
private val FILL_TIMESTAMP_LATER: Long? = null
- private val logger = Logger(KafkaSink::class)
+ private val logger = Logger(KafkaPublisher::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 96e45a02..7a498652 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,66 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.effects.IO
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.lang.Integer.max
+import java.util.Collections.synchronizedMap
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider internal constructor(
- private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
-
- constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
-
- override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+internal class KafkaSinkProvider : SinkProvider {
+ private val messageSinks = synchronizedMap(
+ mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
+ )
+
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ messageSinks.computeIfAbsent(stream, ::createKafkaSender).let {
+ KafkaPublisher(it, ctx)
+ }
+ }
override fun close() = IO {
- kafkaSender.close()
- logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ messageSinks.values.forEach { it.close() }
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
}
companion object {
private val logger = Logger(KafkaSinkProvider::class)
- private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
- private const val BUFFER_MEMORY_MULTIPLIER = 32
- private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
-
- private fun constructKafkaSender(config: CollectorConfiguration) =
- KafkaSender.create(constructSenderOptions(config))
-
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
- .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
- .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
- .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
- .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
- .producerProperty(RETRIES_CONFIG, 1)
- .producerProperty(ACKS_CONFIG, "1")
- .stopOnError(false)
-
- private fun maxRequestSize(maxRequestSizeBytes: Int) =
- (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
-
- private fun bufferMemory(maxRequestSizeBytes: Int) =
- max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
new file mode 100644
index 00000000..40de8c51
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+
+private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+private const val BUFFER_MEMORY_MULTIPLIER = 32
+private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
+
+internal fun createKafkaSender(sinkStream: SinkStream) =
+ (sinkStream as KafkaSink).let { kafkaSink ->
+ KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers())
+ .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink))
+ .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink))
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
+ .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+ .stopOnError(false)
+ )
+ }
+
+private fun maxRequestSize(kafkaSink: KafkaSink) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt()
+
+private fun bufferMemory(kafkaSink: KafkaSink) =
+ Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes())