diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-14 09:48:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 09:49:02 +0200 |
commit | 67689405071acdad2b26d5112b3662605e474ce9 (patch) | |
tree | 3e945129934d5721922fdabf229b0d61b772dfdb /hv-collector-core | |
parent | e7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff) |
Various improvements
* Kotlin upgrade
* Monad usage on APIs
* Idle timeout
* Simulator enhancements
Closes ONAP-390
Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
13 files changed, 228 insertions, 159 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index a372fb22..18657316 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -19,130 +19,135 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> - <parent> - <groupId>org.onap.dcaegen2.collectors.veshv</groupId> - <artifactId>ves-hv-collector</artifactId> - <version>1.0.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> + <parent> + <groupId>org.onap.dcaegen2.collectors.veshv</groupId> + <artifactId>ves-hv-collector</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> - <artifactId>hv-collector-core</artifactId> - <description>VES HighVolume Collector :: Core</description> + <artifactId>hv-collector-core</artifactId> + <description>VES HighVolume Collector :: Core</description> - <properties> - <skipAnalysis>false</skipAnalysis> - </properties> + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> - <build> - <plugins> - <plugin> - <artifactId>kotlin-maven-plugin</artifactId> - <groupId>org.jetbrains.kotlin</groupId> - </plugin> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <groupId>org.apache.maven.plugins</groupId> - </plugin> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-core</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.addons</groupId> - <artifactId>reactor-extra</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.ipc</groupId> - <artifactId>reactor-netty</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-tcnative-boringssl-static</artifactId> - <scope>runtime</scope> - <classifier>${os.detected.classifier}</classifier> - </dependency> - <dependency> - <groupId>javax.json</groupId> - <artifactId>javax.json-api</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <scope>runtime</scope> - </dependency> + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-domain</artifactId> + <version>${project.parent.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-reflect</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.ipc</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <scope>runtime</scope> + <classifier>${os.detected.classifier}</classifier> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <scope>runtime</scope> + </dependency> - <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>test</scope> - </dependency> - </dependencies> + + <dependency> + <groupId>com.nhaarman</groupId> + <artifactId>mockito-kotlin</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ed686fe8..d6158481 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -32,9 +33,10 @@ interface Collector { typealias CollectorProvider = () -> Collector interface Server { - fun start(): Mono<Void> + fun start(): IO<ServerHandle> } -interface ServerFactory { - fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server +abstract class ServerHandle(val host: String, val port: Int) { + abstract fun shutdown(): IO<Unit> + abstract fun await(): IO<Unit> } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index f3f0a891..cee658b6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,10 +19,12 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage class Router(private val routing: Routing) { - fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message) + fun findDestination(message: VesMessage): Option<RoutedMessage> = + routing.routeFor(message.header).map { it(message) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 222eaefa..033095ad 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector @@ -67,7 +68,10 @@ internal class VesHvCollector( wireChunkDecoder.release() } - private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) + private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = + mapper(input).fold( + { Mono.empty() }, + { Mono.just(it) }) companion object { val logger = Logger(VesHvCollector::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index a5c41046..5f4bf354 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider { override fun invoke(config: CollectorConfiguration): Sink { return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider { companion object { const val INFO_LOGGING_FREQ = 100_000 + private val logger = Logger(LoggingSinkProvider::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 0a548a52..f8fa72a6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,27 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult +import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { + private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) - return sender.send(records) + val result = sender.send(records) .doOnNext(::logException) .filter(::isSuccessful) .map { it.correlationMetadata() } + + return if (logger.traceEnabled) { + result.doOnNext(::logSentMessage) + } else { + result + } } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } } - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + private fun logSentMessage(sentMsg: RoutedMessage) { + logger.trace { + val msgNum = sentMessages.incrementAndGet() + "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" + } + } + + private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt index 9753d9e5..4e9932cc 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt @@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class ProtobufSerializer :Serializer<MessageLite> { +class ProtobufSerializer : Serializer<MessageLite> { override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { // no configuration } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 65b3b29e..0426ceb1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher @@ -28,7 +30,9 @@ import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions +import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer +import java.time.Duration import java.util.function.BiFunction /** @@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val sslContextFactory: SslContextFactory, private val collectorProvider: CollectorProvider) : Server { - override fun start(): Mono<Void> { - logger.info { "Listening on port ${serverConfig.port}" } - return Mono.defer { - val nettyContext = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u -> - handleConnection(t, u) - }) - Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() } - } + override fun start(): IO<ServerHandle> = IO { + val ctx = TcpServer.builder() + .options(this::configureServer) + .build() + .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ -> + handleConnection(input) + }) + NettyServerHandle(ctx) } private fun configureServer(opts: ServerOptions.Builder<*>) { @@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration)) } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { - logger.debug("Got connection") - nettyOutbound.alloc() + private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> { + logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + + val dataStream = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() - val sendHello = nettyOutbound - .options { it.flushOnEach() } - .sendString(Mono.just("ONAP_VES_HV/0.1\n")) - .then() + return collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), dataStream) + } - val handleIncomingMessages = collectorProvider() - .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) + private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + onReadIdle(timeout.toMillis()) { + logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } + context().channel().close().addListener { - return sendHello.then(handleIncomingMessages) + if (it.isSuccess) + logger.debug { "Client disconnected because of idle timeout" } + else + logger.warn("Channel close failed", it.cause()) + } + } + return this + } + + private fun NettyInbound.logConnectionClosed(): NettyInbound { + context().onClose { + logger.info("Connection from ${remoteAddress()} has been closed") + } + return this } + + private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { + override fun shutdown() = IO { + ctx.shutdown() + } + + override fun await() = IO<Unit> { + ctx.context.channel().closeFuture().sync() + } + } + companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt index 34a8b928..b788f511 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -56,7 +56,7 @@ internal class StreamBufferEmitter( else -> { streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) sink.onDispose { - logger.debug("Disposing read components") + logger.trace { "Disposing read components" } streamBuffer.discardReadComponents() } sink.onRequest { requestedFrameCount -> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index a576dc65..abebff3d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -84,7 +84,7 @@ internal class WireFrameSink( try { decoder.decodeFirst(streamBuffer) } catch (ex: MissingWireFrameBytesException) { - logger.debug { "${ex.message} - waiting for more data" } + logger.trace { "${ex.message} - waiting for more data" } null } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 8d01c075..67a7d6f2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration data class ServerConfiguration( val port: Int, val configurationUrl: String, - val securityConfiguration: SecurityConfiguration) + val securityConfiguration: SecurityConfiguration, + val idleTimeout: Duration, + val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bc030587..e9cd5f3f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.model +import arrow.core.Option import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain data class Routing(val routes: List<Route>) { - fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) } + fun routeFor(commonHeader: CommonEventHeader): Option<Route> = + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index c852f5f4..599a9d40 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -19,12 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.None +import arrow.core.Some import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -61,15 +64,15 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(2) + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) } it("should be routed to proper topic") { - assertThat(result?.topic).isEqualTo("ves_rtpm") + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) } it("should be routed with a given message") { - assertThat(result?.message).isSameAs(message) + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) } } @@ -82,15 +85,15 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(0) + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) } it("should be routed to proper topic") { - assertThat(result?.topic).isEqualTo("ves_trace") + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) } it("should be routed with a given message") { - assertThat(result?.message).isSameAs(message) + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) } } @@ -99,7 +102,7 @@ object RouterTest : Spek({ val result = cut.findDestination(message) it("should not have route available") { - assertThat(result).isNull() + assertThat(result).isEqualTo(None) } } } |