aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 11:43:18 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 14:30:32 +0100
commitd7532776b9d608632b91a6c658fcd72ca7c70d64 (patch)
tree0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-core/src
parent4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff)
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt11
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt2
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt18
7 files changed, 46 insertions, 15 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index e4a73947..6a6e73fb 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.Closeable
import reactor.core.publisher.Flux
interface Sink {
@@ -42,16 +43,8 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-interface SinkProvider {
+interface SinkProvider: Closeable {
operator fun invoke(ctx: ClientContext): Sink
-
- companion object {
- fun just(sink: Sink): SinkProvider =
- object : SinkProvider {
- override fun invoke(
- ctx: ClientContext): Sink = sink
- }
- }
}
interface ConfigurationProvider {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 5584d61d..5c64c70b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,18 +22,19 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import java.util.*
interface Collector {
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = (ClientContext) -> Option<Collector>
+interface CollectorProvider : Closeable {
+ operator fun invoke(ctx: ClientContext): Option<Collector>
+}
interface Server {
fun start(): IO<ServerHandle>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 861065c1..535d1baa 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.factory
+import arrow.core.Option
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
@@ -60,8 +62,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
.subscribe(config::set)
- return { ctx: ClientContext ->
- config.getOption().map { createVesHvCollector(it, ctx) }
+ return object : CollectorProvider {
+ override fun invoke(ctx: ClientContext): Option<Collector> =
+ config.getOption().map { createVesHvCollector(it, ctx) }
+
+ override fun close() = sinkProvider.close()
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 5e7d9f57..aa76ce3e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
@@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
@@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor(
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+ override fun close() = IO {
+ kafkaSender.close()
+ logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ }
+
companion object {
+ private val logger = Logger(KafkaSinkProvider::class)
private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 725622f7..c76233f0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -63,6 +64,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.addressSupplier { serverConfig.serverListenAddress }
.configureSsl()
.handle(this::handleConnection)
+ .doOnUnbound {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ collectorProvider.close().unsafeRunSync()
+ }
.let { NettyServerHandle(it.bindNow()) }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
index 2407eced..a72ec034 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
@@ -30,7 +30,7 @@ import java.util.*
*/
object ServiceContext {
val instanceId = UUID.randomUUID().toString()
- val serverFqdn = getHost().hostName
+ val serverFqdn = getHost().hostName!!
val mdc = mapOf(
OnapMdc.INSTANCE_ID to instanceId,
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index f23154a4..2db6a152 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -20,6 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.syntax.collections.tail
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.KafkaSender
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({
}
}
}
+
+ given("dummy KafkaSender") {
+ val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
+ val cut = KafkaSinkProvider(kafkaSender)
+
+ on("close") {
+ cut.close().unsafeRunSync()
+
+ it("should close KafkaSender") {
+ verify(kafkaSender).close()
+ }
+ }
+ }
}
})