diff options
Diffstat (limited to 'sources/hv-collector-core/src')
8 files changed, 69 insertions, 59 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ba0a9eee..0039ef62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable @@ -36,5 +35,5 @@ interface CollectorProvider : Closeable { } interface Server { - fun start(): IO<ServerHandle> + fun start(): Mono<ServerHandle> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 1c79abd2..8fb4e80d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration, } private fun createVesHvCollector(ctx: ClientContext): Collector = - VesHvCollector( + HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index 618b818f..7d8f0cb1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -44,7 +44,7 @@ import reactor.core.publisher.Mono * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class VesHvCollector( +internal class HvVesCollector( private val clientContext: ClientContext, private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, @@ -116,6 +116,6 @@ internal class VesHvCollector( filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { - private val logger = Logger(VesHvCollector::class) + private val logger = Logger(HvVesCollector::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty<Route>() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux<Route> { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty<Route>() + } + + private fun routeFound(message: VesMessage, route: Route): Flux<Route> { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 2ce0f42f..7b726ab4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 7a498652..86980832 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender @@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import reactor.kafka.sender.KafkaSender import java.util.Collections.synchronizedMap @@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider { } } - override fun close() = IO { - messageSinks.values.forEach { it.close() } - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } + override fun close(): Mono<Void> = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } companion object { private val logger = Logger(KafkaSinkProvider::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3e19414d..a208384a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider @@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { - override fun start(): IO<ServerHandle> = IO { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .doOnUnbound { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - collectorProvider.close().unsafeRunSync() - } - .let { NettyServerHandle(it.bindNow()) } - } + override fun start(): Mono<ServerHandle> = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono<Void> = + collectorProvider.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + private fun TcpServer.configureSsl() = sslContext @@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = withNewClientContextFrom(nettyInbound, nettyOutbound) { clientContext -> - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + logger.debug(clientContext::fullMdc) { "Client connection request received" } clientContext.clientAddress .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } @@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() - logger.info(clientContext::fullMdc) { "Handling new client connection" } + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } val collector = collectorProvider(clientContext) return collector.handleClient(clientContext, nettyInbound) } private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> + private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { |