summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Wielebski <piotr.wielebski@nokia.com>2019-01-22 13:59:22 +0000
committerGerrit Code Review <gerrit@onap.org>2019-01-22 13:59:22 +0000
commit769acc65bb4709fa36ff3cbe16994eaf88792f74 (patch)
treeb8bfefbd399f449b393142014b398568b5e9cf39
parentda7ef3ec63a7393d1b48c4784ca8f8029176f935 (diff)
parentd7532776b9d608632b91a6c658fcd72ca7c70d64 (diff)
Merge "Close KafkaSender when handling SIGINT"
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt11
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt2
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt18
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt24
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt8
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt1
-rw-r--r--sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt11
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt45
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt4
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml4
-rw-r--r--sources/hv-collector-utils/pom.xml4
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt40
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt11
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt9
18 files changed, 174 insertions, 48 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index e4a73947..6a6e73fb 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.Closeable
import reactor.core.publisher.Flux
interface Sink {
@@ -42,16 +43,8 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-interface SinkProvider {
+interface SinkProvider: Closeable {
operator fun invoke(ctx: ClientContext): Sink
-
- companion object {
- fun just(sink: Sink): SinkProvider =
- object : SinkProvider {
- override fun invoke(
- ctx: ClientContext): Sink = sink
- }
- }
}
interface ConfigurationProvider {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 5584d61d..5c64c70b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,18 +22,19 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import java.util.*
interface Collector {
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = (ClientContext) -> Option<Collector>
+interface CollectorProvider : Closeable {
+ operator fun invoke(ctx: ClientContext): Option<Collector>
+}
interface Server {
fun start(): IO<ServerHandle>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 861065c1..535d1baa 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.factory
+import arrow.core.Option
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
@@ -60,8 +62,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
.subscribe(config::set)
- return { ctx: ClientContext ->
- config.getOption().map { createVesHvCollector(it, ctx) }
+ return object : CollectorProvider {
+ override fun invoke(ctx: ClientContext): Option<Collector> =
+ config.getOption().map { createVesHvCollector(it, ctx) }
+
+ override fun close() = sinkProvider.close()
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 5e7d9f57..aa76ce3e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
@@ -30,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
@@ -47,7 +50,13 @@ internal class KafkaSinkProvider internal constructor(
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+ override fun close() = IO {
+ kafkaSender.close()
+ logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+ }
+
companion object {
+ private val logger = Logger(KafkaSinkProvider::class)
private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 725622f7..c76233f0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -63,6 +64,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.addressSupplier { serverConfig.serverListenAddress }
.configureSsl()
.handle(this::handleConnection)
+ .doOnUnbound {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ collectorProvider.close().unsafeRunSync()
+ }
.let { NettyServerHandle(it.bindNow()) }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
index 2407eced..a72ec034 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
@@ -30,7 +30,7 @@ import java.util.*
*/
object ServiceContext {
val instanceId = UUID.randomUUID().toString()
- val serverFqdn = getHost().hostName
+ val serverFqdn = getHost().hostName!!
val mdc = mapOf(
OnapMdc.INSTANCE_ID to instanceId,
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index f23154a4..2db6a152 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -20,6 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.syntax.collections.tail
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.KafkaSender
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({
}
}
}
+
+ given("dummy KafkaSender") {
+ val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
+ val cut = KafkaSinkProvider(kafkaSender)
+
+ on("close") {
+ cut.close().unsafeRunSync()
+
+ it("should close KafkaSender") {
+ verify(kafkaSender).close()
+ }
+ }
+ }
}
})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index c3e4a581..30661e84 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.tests.component
import arrow.core.getOrElse
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.*
import reactor.core.publisher.Flux
import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) {
+class Sut(sink: Sink = StoringSink()): AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
+ val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
configurationProvider,
- SinkProvider.just(sink),
+ sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
healthStateProvider)
@@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) {
throw IllegalStateException("Collector not available.")
}
+ override fun close() {
+ collectorProvider.close().unsafeRunSync()
+ }
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
+class DummySinkProvider(private val sink: Sink) : SinkProvider {
+ private val active = AtomicBoolean(true)
+
+ override fun invoke(ctx: ClientContext) = sink
+
+ override fun close() = IO {
+ active.set(false)
+ }
+
+ val closed get() = !active.get()
+
+}
+
private val timeout = Duration.ofSeconds(10)
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 75e7cf0e..ed46b119 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -61,6 +61,14 @@ object VesHvSpecification : Spek({
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should close sink when closing collector provider") {
+ val (sut, _) = vesHvWithStoringSink()
+
+ sut.close()
+
+ assertThat(sut.sinkProvider.closed).isTrue()
+ }
}
describe("Memory management") {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index b4ce6499..2e7065b2 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
-import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 7aade34b..32486009 100644
--- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.netty.DisposableServer
import reactor.netty.http.server.HttpServer
import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerResponse
@@ -48,7 +49,10 @@ class HealthCheckApiServer(private val healthState: HealthState,
fun start(): IO<ServerHandle> = IO {
healthState().subscribe(healthDescription::set)
val ctx = HttpServer.create()
- .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .tcpConfiguration {
+ it.addressSupplier { listenAddress }
+ .doOnUnbound { logClose() }
+ }
.route { routes ->
routes.get("/health/ready", ::readinessHandler)
routes.get("/health/alive", ::livenessHandler)
@@ -70,8 +74,13 @@ class HealthCheckApiServer(private val healthState: HealthState,
private fun monitoringHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
resp.sendString(monitoring.lastStatus())
+ private fun logClose() {
+ logger.info { "Health Check API closed" }
+ }
+
companion object {
private val logger = Logger(HealthCheckApiServer::class)
+
}
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 16da3721..d865bcf5 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -26,7 +26,11 @@ import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -37,24 +41,29 @@ private val logger = Logger("$VESHV_PACKAGE.main")
private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
fun main(args: Array<String>) =
- ArgVesHvConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.info { "Finished" } }
- )
+ ArgVesHvConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startAndAwaitServers)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+ ExitFailure(1)
+ },
+ { logger.debug(ServiceContext::mdc) { "Finished" } }
+ )
private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
- HealthCheckServer.start(config).bind()
- VesServer.start(config).bind().run {
- registerShutdownHook(shutdown()).bind()
- await().bind()
+ IO.monad().binding {
+ Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+ val healthCheckServerHandle = HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind().let { handle ->
+ registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
+ handle.await().bind()
+ }
+ }.fix()
+
+private fun closeServers(vararg handles: ServerHandle): IO<Unit> =
+ Closeable.closeAll(handles.asIterable()).then {
+ logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
- }.fix()
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
index 13b0bc7b..3d1a2a21 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -21,7 +21,9 @@ package org.onap.dcae.collectors.veshv.main.servers
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
@@ -31,7 +33,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
abstract class ServerStarter {
fun start(config: ServerConfiguration): IO<ServerHandle> =
startServer(config)
- .map { logger.info { serverStartedMessage(it) }; it }
+ .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
protected abstract fun serverStartedMessage(handle: ServerHandle): String
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 5ce34800..40f3c8a0 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -90,9 +90,9 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
- <logger name="io.netty" level="DEBUG"/>
+ <logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
- <logger name="org.apache.kafka" level="WARN"/>
+ <logger name="org.apache.kafka" level="INFO"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml
index 5bd24729..6ce74bdb 100644
--- a/sources/hv-collector-utils/pom.xml
+++ b/sources/hv-collector-utils/pom.xml
@@ -82,6 +82,10 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
new file mode 100644
index 00000000..00b814cc
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since January 2019
+ */
+interface Closeable {
+ fun close(): IO<Unit> = IO.unit
+
+ companion object {
+ fun closeAll(closeables: Iterable<Closeable>) =
+ IO.monadError().binding {
+ closeables.forEach { it.close().bind() }
+ }.fix()
+ }
+}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index 3c2c64ac..290ef72c 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -23,6 +23,9 @@ import arrow.core.Either
import arrow.core.Left
import arrow.core.Right
import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monadError.monadError
+import arrow.typeclasses.binding
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.system.exitProcess
@@ -46,7 +49,7 @@ object ExitSuccess : ExitCode() {
data class ExitFailure(override val code: Int) : ExitCode()
-fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
fun IO<Any>.unit() = map { Unit }
@@ -66,3 +69,9 @@ fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
{ Flux.just<T>(it) }
)
}
+
+inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
+ map {
+ block(it)
+ it
+ }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index b8784c64..5b582ed5 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.netty.DisposableServer
import java.time.Duration
@@ -28,8 +27,7 @@ import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
+abstract class ServerHandle(val host: String, val port: Int) : Closeable {
abstract fun await(): IO<Unit>
}
@@ -38,10 +36,8 @@ abstract class ServerHandle(val host: String, val port: Int) {
* @since August 2018
*/
class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun shutdown() = IO {
- logger.info { "Graceful shutdown" }
+ override fun close() = IO {
ctx.disposeNow(SHUTDOWN_TIMEOUT)
- logger.info { "Server disposed" }
}
override fun await() = IO<Unit> {
@@ -49,7 +45,6 @@ class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.ho
}
companion object {
- val logger = Logger(NettyServerHandle::class)
private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
}
}