From 2174a045086e16611128b20a6d4357c04d9eac4a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Tue, 26 Mar 2019 14:21:02 +0100 Subject: Redefine Routing As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/boundary/adapters.kt | 23 ++++--- .../collectors/veshv/factory/CollectorFactory.kt | 10 ++- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 74 ++++++++++++++------ .../dcae/collectors/veshv/impl/VesHvCollector.kt | 32 ++++----- .../veshv/impl/adapters/AdapterFactory.kt | 4 +- .../impl/adapters/ConfigurationProviderImpl.kt | 33 ++++----- .../veshv/impl/adapters/kafka/KafkaPublisher.kt | 78 +++++++++++++++++++++ .../veshv/impl/adapters/kafka/KafkaSink.kt | 79 ---------------------- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 60 +++++----------- .../org/onap/dcae/collectors/veshv/impl/kafka.kt | 56 +++++++++++++++ 10 files changed, 251 insertions(+), 198 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt (limited to 'sources/hv-collector-core/src/main/kotlin') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 0a1e2d43..1b92d90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux -interface Sink { +interface Sink : Closeable { + fun send(message: RoutedMessage) = send(Flux.just(message)) + fun send(messages: Flux): Flux } +interface SinkProvider : Closeable { + operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy +} + +typealias ConfigurationProvider = () -> Flux + interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) @@ -42,11 +51,3 @@ interface Metrics { fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } - -interface SinkProvider : Closeable { - operator fun invoke(ctx: ClientContext): Sink -} - -interface ConfigurationProvider { - operator fun invoke(): Flux> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fa4f9670..2b29acd9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference>() + val config = AtomicReference() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(kafkaSinks: Sequence, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(kafkaSinks, ctx), - sink = sinkProvider(ctx), + router = Router(routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) } } - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index a95a44d5..6e2e20f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,39 +19,75 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.None import arrow.core.toOption +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux -class Router(private val routing: Routing, private val ctx: ClientContext) { - - constructor(kafkaSinks: Sequence, ctx: ClientContext) : this( - Routing( - kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() - ), - ctx) +class Router internal constructor(private val routing: Routing, + private val messageSinks: Map>, + private val ctx: ClientContext, + private val metrics: Metrics) { + constructor(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext, + metrics: Metrics) : + this(routing, + constructMessageSinks(routing, sinkProvider, ctx), + ctx, + metrics) { + logger.debug(ctx::mdc) { "Routing for client: $routing" } + logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + } - fun findDestination(message: VesMessage): Option = + fun route(message: VesMessage): Flux = routeFor(message.header) - .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } - .map { it.routeMessage(message) } + .fold({ + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } + Flux.empty() + }, { + logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } + Flux.just(it) + }) + .flatMap { + val sinkTopic = it.sink.topicName() + messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) + } - private fun Route.routeMessage(message: VesMessage) = - RoutedMessage(targetTopic, partitioning(message.header), message) + private fun routeFor(header: CommonEventHeader) = + routing.find { it.domain == header.domain }.toOption() - private fun routeFor(commonHeader: CommonEventHeader): Option = - routing.routes.find { it.domain == commonHeader.domain }.toOption() + private fun messageSinkFor(sinkTopic: String) = messageSinks + .getOrElse(sinkTopic) { + throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic") + } companion object { - private val logger = Logger(Routing::class) + private val logger = Logger(Router::class) + private val NONE_PARTITION = None + + internal fun constructMessageSinks(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext) = + routing.map(Route::sink) + .distinctBy { it.topicName() } + .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) } + + private fun Lazy.send(message: RoutedMessage) = value.send(message) } + +internal class MissingMessageSinkException(msg: String) : Throwable(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 6a2792c3..433e4d57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -53,7 +49,6 @@ internal class VesHvCollector( private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, - private val sink: Sink, private val metrics: Metrics) : Collector { override fun handleConnection(dataStream: Flux): Mono = @@ -62,10 +57,10 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) } + // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here + .handleErrors() + .transform(::route) + .handleErrors() .doFinally { releaseBuffersMemory() } .then() @@ -98,18 +93,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux): Flux = flux - .flatMap(this::findRoute) - .compose(sink::send) + private fun route(flux: Flux) = flux + .flatMap(router::route) .doOnNext(this::updateSinkMetrics) - private fun findRoute(msg: VesMessage) = router - .findDestination(msg) - .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } - .filterEmptyWithLog(logger, clientContext::fullMdc, - { "Found route for message: ${it.topic}, partition: ${it.partition}" }, - { "Could not find route for message" }) - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { when (consumedMessage) { is SuccessfullyConsumedMessage -> @@ -119,6 +106,11 @@ internal class VesHvCollector( } } + private fun Flux.handleErrors(): Flux = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 10fe0a51..20b11753 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,8 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = - KafkaSinkProvider(config) + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 1f5df371..185693c0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,19 +22,21 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono> = + override fun invoke(): Flux = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -81,26 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono> = cbsClient + private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createCollectorConfiguration) + .map(::createRoutingDescription) .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } .retryWhen(retry) - - private fun createCollectorConfiguration(configuration: JsonObject): Sequence = - try { - DataStreams.namedSinks(configuration) - .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) - .map(streamParser::unsafeParse) - .asSequence() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - + private fun createRoutingDescription(configuration: JsonObject): Routing = try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt new file mode 100644 index 00000000..2ce0f42f --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderRecord + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class KafkaPublisher(private val sender: KafkaSender, + private val ctx: ClientContext) : Sink { + + override fun send(messages: Flux): Flux = + messages.map(::vesToKafkaRecord) + .compose { sender.send(it) } + .map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) + } else { + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) + } + } + + private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord = + SenderRecord.create( + routed.targetTopic, + routed.partition.orNull(), + FILL_TIMESTAMP_LATER, + routed.message.header, + routed.message, + routed) + + companion object { + private val FILL_TIMESTAMP_LATER: Long? = null + private val logger = Logger(KafkaPublisher::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt deleted file mode 100644 index 5052cc5c..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ConsumedMessage -import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage -import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderRecord - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class KafkaSink(private val sender: KafkaSender, - private val ctx: ClientContext) : Sink { - - override fun send(messages: Flux): Flux = - messages.map(::vesToKafkaRecord).let { records -> - sender.send(records).map { - val msg = it.correlationMetadata() - if (it.exception() == null) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - "Message sent to Kafka with metadata: ${it.recordMetadata()}" - } - SuccessfullyConsumedMessage(msg) - } else { - logger.warn(ctx::fullMdc, Marker.Invoke()) { - "Failed to send message to Kafka. Reason: ${it.exception().message}" - } - logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } - FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) - } - } - } - - private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord = - SenderRecord.create( - routed.topic, - routed.partition, - FILL_TIMESTAMP_LATER, - routed.message.header, - routed.message, - routed) - - internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender - - companion object { - private val FILL_TIMESTAMP_LATER: Long? = null - private val logger = Logger(KafkaSink::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 96e45a02..7a498652 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,66 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.effects.IO -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.lang.Integer.max +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk * @since June 2018 */ -internal class KafkaSinkProvider internal constructor( - private val kafkaSender: KafkaSender) : SinkProvider { - - constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) - - override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) +internal class KafkaSinkProvider : SinkProvider { + private val messageSinks = synchronizedMap( + mutableMapOf>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } override fun close() = IO { - kafkaSender.close() - logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + messageSinks.values.forEach { it.close() } + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } } companion object { private val logger = Logger(KafkaSinkProvider::class) - private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f - private const val BUFFER_MEMORY_MULTIPLIER = 32 - private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - - private fun constructKafkaSender(config: CollectorConfiguration) = - KafkaSender.create(constructSenderOptions(config)) - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) - .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) - .producerProperty(RETRIES_CONFIG, 1) - .producerProperty(ACKS_CONFIG, "1") - .stopOnError(false) - - private fun maxRequestSize(maxRequestSizeBytes: Int) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - - private fun bufferMemory(maxRequestSizeBytes: Int) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt new file mode 100644 index 00000000..40de8c51 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + + +private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f +private const val BUFFER_MEMORY_MULTIPLIER = 32 +private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 + +internal fun createKafkaSender(sinkStream: SinkStream) = + (sinkStream as KafkaSink).let { kafkaSink -> + KafkaSender.create(SenderOptions.create() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers()) + .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink)) + .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink)) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) + .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .stopOnError(false) + ) + } + +private fun maxRequestSize(kafkaSink: KafkaSink) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt() + +private fun bufferMemory(kafkaSink: KafkaSink) = + Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()) -- cgit 1.2.3-korg