aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-14 09:48:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 09:49:02 +0200
commit67689405071acdad2b26d5112b3662605e474ce9 (patch)
tree3e945129934d5721922fdabf229b0d61b772dfdb /hv-collector-core/src/main/kotlin
parente7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff)
Various improvements
* Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt73
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt4
11 files changed, 96 insertions, 35 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ed686fe8..d6158481 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -32,9 +33,10 @@ interface Collector {
typealias CollectorProvider = () -> Collector
interface Server {
- fun start(): Mono<Void>
+ fun start(): IO<ServerHandle>
}
-interface ServerFactory {
- fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+abstract class ServerHandle(val host: String, val port: Int) {
+ abstract fun shutdown(): IO<Unit>
+ abstract fun await(): IO<Unit>
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index f3f0a891..cee658b6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,10 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.Routing
import org.onap.dcae.collectors.veshv.model.VesMessage
class Router(private val routing: Routing) {
- fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+ fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ routing.routeFor(message.header).map { it(message) }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 222eaefa..033095ad 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
@@ -67,7 +68,10 @@ internal class VesHvCollector(
wireChunkDecoder.release()
}
- private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+ private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
+ mapper(input).fold(
+ { Mono.empty() },
+ { Mono.just(it) })
companion object {
val logger = Logger(VesHvCollector::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index a5c41046..5f4bf354 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider {
override fun invoke(config: CollectorConfiguration): Sink {
return object : Sink {
- private val logger = Logger(LoggingSinkProvider::class)
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
@@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider {
companion object {
const val INFO_LOGGING_FREQ = 100_000
+ private val logger = Logger(LoggingSinkProvider::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 0a548a52..f8fa72a6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,27 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+ private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
- return sender.send(records)
+ val result = sender.send(records)
.doOnNext(::logException)
.filter(::isSuccessful)
.map { it.correlationMetadata() }
+
+ return if (logger.traceEnabled) {
+ result.doOnNext(::logSentMessage)
+ } else {
+ result
+ }
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun logSentMessage(sentMsg: RoutedMessage) {
+ logger.trace {
+ val msgNum = sentMessages.incrementAndGet()
+ "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
companion object {
val logger = Logger(KafkaSink::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
index 9753d9e5..4e9932cc 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ProtobufSerializer :Serializer<MessageLite> {
+class ProtobufSerializer : Serializer<MessageLite> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
// no configuration
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 65b3b29e..0426ceb1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
@@ -28,7 +30,9 @@ import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
+import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
+import java.time.Duration
import java.util.function.BiFunction
/**
@@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val sslContextFactory: SslContextFactory,
private val collectorProvider: CollectorProvider) : Server {
- override fun start(): Mono<Void> {
- logger.info { "Listening on port ${serverConfig.port}" }
- return Mono.defer {
- val nettyContext = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
- handleConnection(t, u)
- })
- Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
- }
+ override fun start(): IO<ServerHandle> = IO {
+ val ctx = TcpServer.builder()
+ .options(this::configureServer)
+ .build()
+ .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
+ handleConnection(input)
+ })
+ NettyServerHandle(ctx)
}
private fun configureServer(opts: ServerOptions.Builder<*>) {
@@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration))
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- logger.debug("Got connection")
- nettyOutbound.alloc()
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
+ logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+
+ val dataStream = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
- val sendHello = nettyOutbound
- .options { it.flushOnEach() }
- .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
- .then()
+ return collectorProvider()
+ .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ }
- val handleIncomingMessages = collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
+ private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ onReadIdle(timeout.toMillis()) {
+ logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
+ context().channel().close().addListener {
- return sendHello.then(handleIncomingMessages)
+ if (it.isSuccess)
+ logger.debug { "Client disconnected because of idle timeout" }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+ return this
+ }
+
+ private fun NettyInbound.logConnectionClosed(): NettyInbound {
+ context().onClose {
+ logger.info("Connection from ${remoteAddress()} has been closed")
+ }
+ return this
}
+
+ private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+ override fun shutdown() = IO {
+ ctx.shutdown()
+ }
+
+ override fun await() = IO<Unit> {
+ ctx.context.channel().closeFuture().sync()
+ }
+ }
+
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
index 34a8b928..b788f511 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -56,7 +56,7 @@ internal class StreamBufferEmitter(
else -> {
streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
sink.onDispose {
- logger.debug("Disposing read components")
+ logger.trace { "Disposing read components" }
streamBuffer.discardReadComponents()
}
sink.onRequest { requestedFrameCount ->
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index a576dc65..abebff3d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -84,7 +84,7 @@ internal class WireFrameSink(
try {
decoder.decodeFirst(streamBuffer)
} catch (ex: MissingWireFrameBytesException) {
- logger.debug { "${ex.message} - waiting for more data" }
+ logger.trace { "${ex.message} - waiting for more data" }
null
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 8d01c075..67a7d6f2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
data class ServerConfiguration(
val port: Int,
val configurationUrl: String,
- val securityConfiguration: SecurityConfiguration)
+ val securityConfiguration: SecurityConfiguration,
+ val idleTimeout: Duration,
+ val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bc030587..e9cd5f3f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.Option
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
data class Routing(val routes: List<Route>) {
- fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+ fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
}
data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {