summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-02 15:40:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-03 08:51:03 +0200
commit302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch)
treec9b716c649deb8b14d9ace320b3f35ed22604d0e
parent6a00e38550fd1745c3377da2099bf5a615f69053 (diff)
Fix shutting down when new config received bug
When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--development/configuration/local.json20
-rw-r--r--sources/hv-collector-commandline/pom.xml4
-rw-r--r--sources/hv-collector-core/pom.xml4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt)4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt49
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
-rw-r--r--sources/hv-collector-main/pom.xml4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt29
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml1
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt9
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt30
21 files changed, 151 insertions, 113 deletions
diff --git a/development/configuration/local.json b/development/configuration/local.json
new file mode 100644
index 00000000..a1a8b533
--- /dev/null
+++ b/development/configuration/local.json
@@ -0,0 +1,20 @@
+{
+ "logLevel": "DEBUG",
+ "server": {
+ "listenPort": 8061,
+ "idleTimeoutSec": 60,
+ "maxPayloadSizeBytes": 1048576
+ },
+ "cbs": {
+ "firstRequestDelaySec": 10,
+ "requestIntervalSec": 5
+ },
+ "security": {
+ "keys": {
+ "keyStoreFile": "development/ssl/server.p12",
+ "keyStorePassword": "onaponap",
+ "trustStoreFile": "development/ssl/trust.p12",
+ "trustStorePassword": "onaponap"
+ }
+ }
+} \ No newline at end of file
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml
index 7f8643de..078a3cb5 100644
--- a/sources/hv-collector-commandline/pom.xml
+++ b/sources/hv-collector-commandline/pom.xml
@@ -41,10 +41,6 @@
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index e7134e18..e15592f3 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ba0a9eee..0039ef62 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -36,5 +35,5 @@ interface CollectorProvider : Closeable {
}
interface Server {
- fun start(): IO<ServerHandle>
+ fun start(): Mono<ServerHandle>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 1c79abd2..8fb4e80d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration,
}
private fun createVesHvCollector(ctx: ClientContext): Collector =
- VesHvCollector(
+ HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index 618b818f..7d8f0cb1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -44,7 +44,7 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class VesHvCollector(
+internal class HvVesCollector(
private val clientContext: ClientContext,
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
@@ -116,6 +116,6 @@ internal class VesHvCollector(
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
- private val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(HvVesCollector::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index 6e2e20f7..b03b89e1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .fold({
- metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
- logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
- logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
- Flux.empty<Route>()
- }, {
- logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
- Flux.just(it)
- })
+ .fold({ routeNotFound(message) }, { routeFound(message, it) })
.flatMap {
val sinkTopic = it.sink.topicName()
messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
}
+ private fun routeNotFound(message: VesMessage): Flux<Route> {
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+ return Flux.empty<Route>()
+ }
+
+ private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+ logger.trace(ctx::fullMdc) {
+ "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+ }
+ return Flux.just(route)
+ }
+
+
private fun routeFor(header: CommonEventHeader) =
routing.find { it.domain == header.domain }.toOption()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 2ce0f42f..7b726ab4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 7a498652..86980832 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
@@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import reactor.kafka.sender.KafkaSender
import java.util.Collections.synchronizedMap
@@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider {
}
}
- override fun close() = IO {
- messageSinks.values.forEach { it.close() }
- logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
- }
+ override fun close(): Mono<Void> =
+ Flux.fromIterable(messageSinks.values)
+ .publishOn(Schedulers.elastic())
+ .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close)
+ .then()
+ .doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
+ }
companion object {
private val logger = Logger(KafkaSinkProvider::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 3e19414d..a208384a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
@@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
- override fun start(): IO<ServerHandle> = IO {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .doOnUnbound {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- collectorProvider.close().unsafeRunSync()
- }
- .let { NettyServerHandle(it.bindNow()) }
- }
+ override fun start(): Mono<ServerHandle> =
+ Mono.defer {
+ TcpServer.create()
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+ .configureSsl()
+ .handle(this::handleConnection)
+ .bind()
+ .map {
+ NettyServerHandle(it, closeAction())
+ }
+ }
+
+ private fun closeAction(): Mono<Void> =
+ collectorProvider.close().doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ }
+
private fun TcpServer.configureSsl() =
sslContext
@@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
withNewClientContextFrom(nettyInbound, nettyOutbound)
{ clientContext ->
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc) { "Client connection request received" }
clientContext.clientAddress
.map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
@@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
val collector = collectorProvider(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
+ nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ connection
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ handleConnection(nettyInbound.createDataStream())
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index b735138d..ca9d28ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -19,17 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.wire
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
- .unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
when (err) {
- is InvalidWireFrame -> IO {
+ is InvalidWireFrame ->
next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
+ is MissingWireFrameBytes -> {
logEndOfData()
next.complete()
}
}
}
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
private fun logIncomingMessage(wire: ByteBuf) {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index f79c2e46..95b9159e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
if (sinkInitialized.get()) {
sink.close()
} else {
- IO.unit
+ Mono.empty()
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2430c74f..d845f7c4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -67,7 +67,7 @@ object VesHvSpecification : Spek({
// just connecting should not create sink
sut.handleConnection()
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isFalse()
@@ -80,7 +80,7 @@ object VesHvSpecification : Spek({
sut.handleConnection(vesWireFrameMessage(PERF3GPP))
// when
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isTrue()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 160defdb..f1b1ba2d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
@@ -50,12 +51,9 @@ class StoringSink : Sink {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
- /*
- * TOD0: if the code would look like:
- * ```IO { active.set(false) }```
- * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
- */
- override fun close() = active.set(false).run { IO.unit }
+ override fun close(): Mono<Void> = Mono.fromRunnable {
+ active.set(false)
+ }
}
/**
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index edbdaa36..57f21a66 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc207ef8..8b0a38bb 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
+import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
@@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main")
private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
+private val maxCloseTime = Duration.ofSeconds(10)
fun main(args: Array<String>) {
val configStateListener = object : ConfigurationStateListener {
@@ -60,30 +63,36 @@ fun main(args: Array<String>) {
logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
- .doOnNext(::startServer)
+ .flatMap(::startServer)
.doOnError(::logServerStartFailed)
.then()
.block()
}
-private fun startServer(config: HvVesConfiguration) {
- stopRunningServer()
+private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
+ stopRunningServer()
+ .timeout(maxCloseTime)
+ .then(deferredVesServer(config))
+ .doOnNext {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+
+private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
logger.debug(ServiceContext::mdc) { "Configuration: $config" }
-
- VesServer.start(config).let {
- registerShutdownHook { shutdownGracefully(it) }
- hvVesServer.set(it)
- }
+ VesServer.start(config)
}
-private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
+private fun stopRunningServer() = Mono.defer {
+ hvVesServer.get()?.close() ?: Mono.empty()
+}
internal fun shutdownGracefully(serverHandle: ServerHandle,
healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- serverHandle.close().unsafeRunSync()
+ serverHandle.close().block(maxCloseTime)
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index c079cc59..fc4d8662 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,11 +39,10 @@ object VesServer {
private val logger = Logger(VesServer::class)
- fun start(config: HvVesConfiguration): ServerHandle =
+ fun start(config: HvVesConfiguration): Mono<ServerHandle> =
createVesServer(config)
.start()
- .then(::logServerStarted)
- .unsafeRunSync()
+ .doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
initializeCollectorFactory(config)
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 21c1fa31..539f7c2c 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -91,6 +91,7 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
+ <logger name="reactor.netty.tcp.TcpServer" level="OFF"/>
<logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
<logger name="org.apache.kafka" level="INFO"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index d8de9f25..a967fba0 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.inOrder
import com.nhaarman.mockitokotlin2.mock
@@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle = mock<ServerHandle>()
+ val handle: ServerHandle = mock()
var closed = false
- val handleClose = IO {
- closed = true
- }
- whenever(handle.close()).thenReturn(handleClose)
+ whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true })
val healthState: HealthState = mock()
on("shutdownGracefully") {
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
index 00b814cc..ec654b32 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -19,22 +19,18 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
interface Closeable {
- fun close(): IO<Unit> = IO.unit
+ fun close(): Mono<Void> = Mono.empty()
companion object {
fun closeAll(closeables: Iterable<Closeable>) =
- IO.monadError().binding {
- closeables.forEach { it.close().bind() }
- }.fix()
+ Flux.fromIterable(closeables).flatMap(Closeable::close).then()
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 5b582ed5..670ab4ac 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,8 +20,9 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable {
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun close() = IO {
- ctx.disposeNow(SHUTDOWN_TIMEOUT)
- }
+class NettyServerHandle(private val ctx: DisposableServer,
+ private val closeAction: Mono<Void> = Mono.empty())
+ : ServerHandle(ctx.host(), ctx.port()) {
+
+ override fun close(): Mono<Void> =
+ Mono.just(ctx)
+ .filter { !it.isDisposed }
+ .flatMap {
+ closeAction.thenReturn(it)
+ }
+ .then(dispose())
+
+ private fun dispose(): Mono<Void> =
+ Mono.create { callback ->
+ logger.debug { "About to dispose NettyServer" }
+ ctx.dispose()
+ ctx.onDispose {
+ logger.debug { "Netty server disposed" }
+ callback.success()
+ }
+ }
override fun await() = IO<Unit> {
ctx.channel().closeFuture().sync()
}
companion object {
- private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+ private val logger = Logger(NettyServerHandle::class)
}
}