From 67689405071acdad2b26d5112b3662605e474ce9 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 14 Jun 2018 09:48:46 +0200 Subject: Various improvements * Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 8 ++- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 4 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 6 +- .../veshv/impl/adapters/LoggingSinkProvider.kt | 3 +- .../veshv/impl/adapters/kafka/KafkaSink.kt | 22 ++++++- .../impl/adapters/kafka/ProtobufSerializer.kt | 2 +- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 73 +++++++++++++++------- .../veshv/impl/wire/StreamBufferEmitter.kt | 2 +- .../collectors/veshv/impl/wire/WireFrameSink.kt | 2 +- .../collectors/veshv/model/ServerConfiguration.kt | 5 +- .../onap/dcae/collectors/veshv/model/routing.kt | 4 +- 11 files changed, 96 insertions(+), 35 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ed686fe8..d6158481 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -32,9 +33,10 @@ interface Collector { typealias CollectorProvider = () -> Collector interface Server { - fun start(): Mono + fun start(): IO } -interface ServerFactory { - fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server +abstract class ServerHandle(val host: String, val port: Int) { + abstract fun shutdown(): IO + abstract fun await(): IO } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index f3f0a891..cee658b6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,10 +19,12 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage class Router(private val routing: Routing) { - fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message) + fun findDestination(message: VesMessage): Option = + routing.routeFor(message.header).map { it(message) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 222eaefa..033095ad 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector @@ -67,7 +68,10 @@ internal class VesHvCollector( wireChunkDecoder.release() } - private fun omitWhenNull(input: T, mapper: (T) -> V?): Mono = Mono.justOrEmpty(mapper(input)) + private fun omitWhenNull(input: T, mapper: (T) -> Option): Mono = + mapper(input).fold( + { Mono.empty() }, + { Mono.just(it) }) companion object { val logger = Logger(VesHvCollector::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index a5c41046..5f4bf354 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider { override fun invoke(config: CollectorConfiguration): Sink { return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider { companion object { const val INFO_LOGGING_FREQ = 100_000 + private val logger = Logger(LoggingSinkProvider::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 0a548a52..f8fa72a6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,27 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult +import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk * @since May 2018 */ internal class KafkaSink(private val sender: KafkaSender) : Sink { + private val sentMessages = AtomicLong(0) override fun send(messages: Flux): Flux { val records = messages.map(this::vesToKafkaRecord) - return sender.send(records) + val result = sender.send(records) .doOnNext(::logException) .filter(::isSuccessful) .map { it.correlationMetadata() } + + return if (logger.traceEnabled) { + result.doOnNext(::logSentMessage) + } else { + result + } } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { @@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender) = senderResult.exception() == null + private fun logSentMessage(sentMsg: RoutedMessage) { + logger.trace { + val msgNum = sentMessages.incrementAndGet() + "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" + } + } + + private fun isSuccessful(senderResult: SenderResult) = senderResult.exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt index 9753d9e5..4e9932cc 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt @@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer * @author Piotr Jaszczyk * @since June 2018 */ -class ProtobufSerializer :Serializer { +class ProtobufSerializer : Serializer { override fun configure(configs: MutableMap?, isKey: Boolean) { // no configuration } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 65b3b29e..0426ceb1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher @@ -28,7 +30,9 @@ import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions +import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer +import java.time.Duration import java.util.function.BiFunction /** @@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val sslContextFactory: SslContextFactory, private val collectorProvider: CollectorProvider) : Server { - override fun start(): Mono { - logger.info { "Listening on port ${serverConfig.port}" } - return Mono.defer { - val nettyContext = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction> { t, u -> - handleConnection(t, u) - }) - Mono.never().doFinally { _ -> nettyContext.shutdown() } - } + override fun start(): IO = IO { + val ctx = TcpServer.builder() + .options(this::configureServer) + .build() + .start(BiFunction> { input, _ -> + handleConnection(input) + }) + NettyServerHandle(ctx) } private fun configureServer(opts: ServerOptions.Builder<*>) { @@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration)) } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { - logger.debug("Got connection") - nettyOutbound.alloc() + private fun handleConnection(nettyInbound: NettyInbound): Mono { + logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + + val dataStream = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() - val sendHello = nettyOutbound - .options { it.flushOnEach() } - .sendString(Mono.just("ONAP_VES_HV/0.1\n")) - .then() + return collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), dataStream) + } - val handleIncomingMessages = collectorProvider() - .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) + private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + onReadIdle(timeout.toMillis()) { + logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } + context().channel().close().addListener { - return sendHello.then(handleIncomingMessages) + if (it.isSuccess) + logger.debug { "Client disconnected because of idle timeout" } + else + logger.warn("Channel close failed", it.cause()) + } + } + return this + } + + private fun NettyInbound.logConnectionClosed(): NettyInbound { + context().onClose { + logger.info("Connection from ${remoteAddress()} has been closed") + } + return this } + + private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { + override fun shutdown() = IO { + ctx.shutdown() + } + + override fun await() = IO { + ctx.context.channel().closeFuture().sync() + } + } + companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt index 34a8b928..b788f511 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -56,7 +56,7 @@ internal class StreamBufferEmitter( else -> { streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) sink.onDispose { - logger.debug("Disposing read components") + logger.trace { "Disposing read components" } streamBuffer.discardReadComponents() } sink.onRequest { requestedFrameCount -> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index a576dc65..abebff3d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -84,7 +84,7 @@ internal class WireFrameSink( try { decoder.decodeFirst(streamBuffer) } catch (ex: MissingWireFrameBytesException) { - logger.debug { "${ex.message} - waiting for more data" } + logger.trace { "${ex.message} - waiting for more data" } null } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 8d01c075..67a7d6f2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.time.Duration /** * @author Piotr Jaszczyk @@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration data class ServerConfiguration( val port: Int, val configurationUrl: String, - val securityConfiguration: SecurityConfiguration) + val securityConfiguration: SecurityConfiguration, + val idleTimeout: Duration, + val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bc030587..e9cd5f3f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.model +import arrow.core.Option import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain data class Routing(val routes: List) { - fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) } + fun routeFor(commonHeader: CommonEventHeader): Option = + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { -- cgit 1.2.3-korg