summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-14 09:48:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 09:49:02 +0200
commit67689405071acdad2b26d5112b3662605e474ce9 (patch)
tree3e945129934d5721922fdabf229b0d61b772dfdb /hv-collector-core
parente7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff)
Various improvements
* Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml239
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt73
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt4
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt17
13 files changed, 228 insertions, 159 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index a372fb22..18657316 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -19,130 +19,135 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
- <parent>
- <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
- <artifactId>ves-hv-collector</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+ <artifactId>ves-hv-collector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>hv-collector-core</artifactId>
- <description>VES HighVolume Collector :: Core</description>
+ <artifactId>hv-collector-core</artifactId>
+ <description>VES HighVolume Collector :: Core</description>
- <properties>
- <skipAnalysis>false</skipAnalysis>
- </properties>
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>kotlin-maven-plugin</artifactId>
- <groupId>org.jetbrains.kotlin</groupId>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <groupId>org.apache.maven.plugins</groupId>
- </plugin>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-utils</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-reflect</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.addons</groupId>
- <artifactId>reactor-extra</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.ipc</groupId>
- <artifactId>reactor-netty</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-tcnative-boringssl-static</artifactId>
- <scope>runtime</scope>
- <classifier>${os.detected.classifier}</classifier>
- </dependency>
- <dependency>
- <groupId>javax.json</groupId>
- <artifactId>javax.json-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- <scope>runtime</scope>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <scope>runtime</scope>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>runtime</scope>
+ </dependency>
- <dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+
+ <dependency>
+ <groupId>com.nhaarman</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ed686fe8..d6158481 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -32,9 +33,10 @@ interface Collector {
typealias CollectorProvider = () -> Collector
interface Server {
- fun start(): Mono<Void>
+ fun start(): IO<ServerHandle>
}
-interface ServerFactory {
- fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+abstract class ServerHandle(val host: String, val port: Int) {
+ abstract fun shutdown(): IO<Unit>
+ abstract fun await(): IO<Unit>
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index f3f0a891..cee658b6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,10 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.Routing
import org.onap.dcae.collectors.veshv.model.VesMessage
class Router(private val routing: Routing) {
- fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+ fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ routing.routeFor(message.header).map { it(message) }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 222eaefa..033095ad 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
@@ -67,7 +68,10 @@ internal class VesHvCollector(
wireChunkDecoder.release()
}
- private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+ private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
+ mapper(input).fold(
+ { Mono.empty() },
+ { Mono.just(it) })
companion object {
val logger = Logger(VesHvCollector::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index a5c41046..5f4bf354 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider {
override fun invoke(config: CollectorConfiguration): Sink {
return object : Sink {
- private val logger = Logger(LoggingSinkProvider::class)
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
@@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider {
companion object {
const val INFO_LOGGING_FREQ = 100_000
+ private val logger = Logger(LoggingSinkProvider::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 0a548a52..f8fa72a6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,27 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+ private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
- return sender.send(records)
+ val result = sender.send(records)
.doOnNext(::logException)
.filter(::isSuccessful)
.map { it.correlationMetadata() }
+
+ return if (logger.traceEnabled) {
+ result.doOnNext(::logSentMessage)
+ } else {
+ result
+ }
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun logSentMessage(sentMsg: RoutedMessage) {
+ logger.trace {
+ val msgNum = sentMessages.incrementAndGet()
+ "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
companion object {
val logger = Logger(KafkaSink::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
index 9753d9e5..4e9932cc 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ProtobufSerializer :Serializer<MessageLite> {
+class ProtobufSerializer : Serializer<MessageLite> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
// no configuration
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 65b3b29e..0426ceb1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
@@ -28,7 +30,9 @@ import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
+import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
+import java.time.Duration
import java.util.function.BiFunction
/**
@@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val sslContextFactory: SslContextFactory,
private val collectorProvider: CollectorProvider) : Server {
- override fun start(): Mono<Void> {
- logger.info { "Listening on port ${serverConfig.port}" }
- return Mono.defer {
- val nettyContext = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
- handleConnection(t, u)
- })
- Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
- }
+ override fun start(): IO<ServerHandle> = IO {
+ val ctx = TcpServer.builder()
+ .options(this::configureServer)
+ .build()
+ .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
+ handleConnection(input)
+ })
+ NettyServerHandle(ctx)
}
private fun configureServer(opts: ServerOptions.Builder<*>) {
@@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration))
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- logger.debug("Got connection")
- nettyOutbound.alloc()
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
+ logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+
+ val dataStream = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
- val sendHello = nettyOutbound
- .options { it.flushOnEach() }
- .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
- .then()
+ return collectorProvider()
+ .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ }
- val handleIncomingMessages = collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
+ private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ onReadIdle(timeout.toMillis()) {
+ logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
+ context().channel().close().addListener {
- return sendHello.then(handleIncomingMessages)
+ if (it.isSuccess)
+ logger.debug { "Client disconnected because of idle timeout" }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+ return this
+ }
+
+ private fun NettyInbound.logConnectionClosed(): NettyInbound {
+ context().onClose {
+ logger.info("Connection from ${remoteAddress()} has been closed")
+ }
+ return this
}
+
+ private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+ override fun shutdown() = IO {
+ ctx.shutdown()
+ }
+
+ override fun await() = IO<Unit> {
+ ctx.context.channel().closeFuture().sync()
+ }
+ }
+
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
index 34a8b928..b788f511 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -56,7 +56,7 @@ internal class StreamBufferEmitter(
else -> {
streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
sink.onDispose {
- logger.debug("Disposing read components")
+ logger.trace { "Disposing read components" }
streamBuffer.discardReadComponents()
}
sink.onRequest { requestedFrameCount ->
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index a576dc65..abebff3d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -84,7 +84,7 @@ internal class WireFrameSink(
try {
decoder.decodeFirst(streamBuffer)
} catch (ex: MissingWireFrameBytesException) {
- logger.debug { "${ex.message} - waiting for more data" }
+ logger.trace { "${ex.message} - waiting for more data" }
null
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 8d01c075..67a7d6f2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
data class ServerConfiguration(
val port: Int,
val configurationUrl: String,
- val securityConfiguration: SecurityConfiguration)
+ val securityConfiguration: SecurityConfiguration,
+ val idleTimeout: Duration,
+ val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bc030587..e9cd5f3f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.Option
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
data class Routing(val routes: List<Route>) {
- fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+ fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
}
data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index c852f5f4..599a9d40 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -19,12 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.None
+import arrow.core.Some
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -61,15 +64,15 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(2)
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
}
it("should be routed to proper topic") {
- assertThat(result?.topic).isEqualTo("ves_rtpm")
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
}
it("should be routed with a given message") {
- assertThat(result?.message).isSameAs(message)
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
}
}
@@ -82,15 +85,15 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(0)
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
}
it("should be routed to proper topic") {
- assertThat(result?.topic).isEqualTo("ves_trace")
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
}
it("should be routed with a given message") {
- assertThat(result?.message).isSameAs(message)
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
}
}
@@ -99,7 +102,7 @@ object RouterTest : Spek({
val result = cut.findDestination(message)
it("should not have route available") {
- assertThat(result).isNull()
+ assertThat(result).isEqualTo(None)
}
}
}