summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-commandline/pom.xml4
-rw-r--r--sources/hv-collector-core/pom.xml4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt)4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt49
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
-rw-r--r--sources/hv-collector-main/pom.xml4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt29
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml1
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt9
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt12
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt30
20 files changed, 131 insertions, 113 deletions
diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml
index 7f8643de..078a3cb5 100644
--- a/sources/hv-collector-commandline/pom.xml
+++ b/sources/hv-collector-commandline/pom.xml
@@ -41,10 +41,6 @@
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
- <groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index e7134e18..e15592f3 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ba0a9eee..0039ef62 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -36,5 +35,5 @@ interface CollectorProvider : Closeable {
}
interface Server {
- fun start(): IO<ServerHandle>
+ fun start(): Mono<ServerHandle>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 1c79abd2..8fb4e80d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration,
}
private fun createVesHvCollector(ctx: ClientContext): Collector =
- VesHvCollector(
+ HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index 618b818f..7d8f0cb1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -44,7 +44,7 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class VesHvCollector(
+internal class HvVesCollector(
private val clientContext: ClientContext,
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
@@ -116,6 +116,6 @@ internal class VesHvCollector(
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
- private val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(HvVesCollector::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index 6e2e20f7..b03b89e1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .fold({
- metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
- logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
- logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
- Flux.empty<Route>()
- }, {
- logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
- Flux.just(it)
- })
+ .fold({ routeNotFound(message) }, { routeFound(message, it) })
.flatMap {
val sinkTopic = it.sink.topicName()
messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
}
+ private fun routeNotFound(message: VesMessage): Flux<Route> {
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+ return Flux.empty<Route>()
+ }
+
+ private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+ logger.trace(ctx::fullMdc) {
+ "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+ }
+ return Flux.just(route)
+ }
+
+
private fun routeFor(header: CommonEventHeader) =
routing.find { it.domain == header.domain }.toOption()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 2ce0f42f..7b726ab4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 7a498652..86980832 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
@@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import reactor.kafka.sender.KafkaSender
import java.util.Collections.synchronizedMap
@@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider {
}
}
- override fun close() = IO {
- messageSinks.values.forEach { it.close() }
- logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
- }
+ override fun close(): Mono<Void> =
+ Flux.fromIterable(messageSinks.values)
+ .publishOn(Schedulers.elastic())
+ .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close)
+ .then()
+ .doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
+ }
companion object {
private val logger = Logger(KafkaSinkProvider::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 3e19414d..a208384a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
@@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
- override fun start(): IO<ServerHandle> = IO {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .doOnUnbound {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- collectorProvider.close().unsafeRunSync()
- }
- .let { NettyServerHandle(it.bindNow()) }
- }
+ override fun start(): Mono<ServerHandle> =
+ Mono.defer {
+ TcpServer.create()
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+ .configureSsl()
+ .handle(this::handleConnection)
+ .bind()
+ .map {
+ NettyServerHandle(it, closeAction())
+ }
+ }
+
+ private fun closeAction(): Mono<Void> =
+ collectorProvider.close().doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ }
+
private fun TcpServer.configureSsl() =
sslContext
@@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
withNewClientContextFrom(nettyInbound, nettyOutbound)
{ clientContext ->
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc) { "Client connection request received" }
clientContext.clientAddress
.map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
@@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
val collector = collectorProvider(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
+ nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ connection
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ handleConnection(nettyInbound.createDataStream())
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index b735138d..ca9d28ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -19,17 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.wire
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
- .unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
when (err) {
- is InvalidWireFrame -> IO {
+ is InvalidWireFrame ->
next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
+ is MissingWireFrameBytes -> {
logEndOfData()
next.complete()
}
}
}
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
private fun logIncomingMessage(wire: ByteBuf) {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index f79c2e46..95b9159e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
if (sinkInitialized.get()) {
sink.close()
} else {
- IO.unit
+ Mono.empty()
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2430c74f..d845f7c4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -67,7 +67,7 @@ object VesHvSpecification : Spek({
// just connecting should not create sink
sut.handleConnection()
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isFalse()
@@ -80,7 +80,7 @@ object VesHvSpecification : Spek({
sut.handleConnection(vesWireFrameMessage(PERF3GPP))
// when
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isTrue()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 160defdb..f1b1ba2d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
@@ -50,12 +51,9 @@ class StoringSink : Sink {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
- /*
- * TOD0: if the code would look like:
- * ```IO { active.set(false) }```
- * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
- */
- override fun close() = active.set(false).run { IO.unit }
+ override fun close(): Mono<Void> = Mono.fromRunnable {
+ active.set(false)
+ }
}
/**
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml
index edbdaa36..57f21a66 100644
--- a/sources/hv-collector-main/pom.xml
+++ b/sources/hv-collector-main/pom.xml
@@ -97,10 +97,6 @@
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
- <artifactId>arrow-effects-instances</artifactId>
- </dependency>
- <dependency>
- <groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc207ef8..8b0a38bb 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
+import java.time.Duration
import java.util.concurrent.atomic.AtomicReference
@@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main")
private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
+private val maxCloseTime = Duration.ofSeconds(10)
fun main(args: Array<String>) {
val configStateListener = object : ConfigurationStateListener {
@@ -60,30 +63,36 @@ fun main(args: Array<String>) {
logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
- .doOnNext(::startServer)
+ .flatMap(::startServer)
.doOnError(::logServerStartFailed)
.then()
.block()
}
-private fun startServer(config: HvVesConfiguration) {
- stopRunningServer()
+private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
+ stopRunningServer()
+ .timeout(maxCloseTime)
+ .then(deferredVesServer(config))
+ .doOnNext {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+
+private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
logger.debug(ServiceContext::mdc) { "Configuration: $config" }
-
- VesServer.start(config).let {
- registerShutdownHook { shutdownGracefully(it) }
- hvVesServer.set(it)
- }
+ VesServer.start(config)
}
-private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
+private fun stopRunningServer() = Mono.defer {
+ hvVesServer.get()?.close() ?: Mono.empty()
+}
internal fun shutdownGracefully(serverHandle: ServerHandle,
healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- serverHandle.close().unsafeRunSync()
+ serverHandle.close().block(maxCloseTime)
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index c079cc59..fc4d8662 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,11 +39,10 @@ object VesServer {
private val logger = Logger(VesServer::class)
- fun start(config: HvVesConfiguration): ServerHandle =
+ fun start(config: HvVesConfiguration): Mono<ServerHandle> =
createVesServer(config)
.start()
- .then(::logServerStarted)
- .unsafeRunSync()
+ .doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
initializeCollectorFactory(config)
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 21c1fa31..539f7c2c 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -91,6 +91,7 @@
</appender>
<logger name="reactor.netty" level="WARN"/>
+ <logger name="reactor.netty.tcp.TcpServer" level="OFF"/>
<logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
<logger name="org.apache.kafka" level="INFO"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index d8de9f25..a967fba0 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.inOrder
import com.nhaarman.mockitokotlin2.mock
@@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle = mock<ServerHandle>()
+ val handle: ServerHandle = mock()
var closed = false
- val handleClose = IO {
- closed = true
- }
- whenever(handle.close()).thenReturn(handleClose)
+ whenever(handle.close()).thenReturn(Mono.empty<Void>().doOnSuccess { closed = true })
val healthState: HealthState = mock()
on("shutdownGracefully") {
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
index 00b814cc..ec654b32 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt
@@ -19,22 +19,18 @@
*/
package org.onap.dcae.collectors.veshv.utils
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
interface Closeable {
- fun close(): IO<Unit> = IO.unit
+ fun close(): Mono<Void> = Mono.empty()
companion object {
fun closeAll(closeables: Iterable<Closeable>) =
- IO.monadError().binding {
- closeables.forEach { it.close().bind() }
- }.fix()
+ Flux.fromIterable(closeables).flatMap(Closeable::close).then()
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 5b582ed5..670ab4ac 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,8 +20,9 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable {
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
- override fun close() = IO {
- ctx.disposeNow(SHUTDOWN_TIMEOUT)
- }
+class NettyServerHandle(private val ctx: DisposableServer,
+ private val closeAction: Mono<Void> = Mono.empty())
+ : ServerHandle(ctx.host(), ctx.port()) {
+
+ override fun close(): Mono<Void> =
+ Mono.just(ctx)
+ .filter { !it.isDisposed }
+ .flatMap {
+ closeAction.thenReturn(it)
+ }
+ .then(dispose())
+
+ private fun dispose(): Mono<Void> =
+ Mono.create { callback ->
+ logger.debug { "About to dispose NettyServer" }
+ ctx.dispose()
+ ctx.onDispose {
+ logger.debug { "Netty server disposed" }
+ callback.success()
+ }
+ }
override fun await() = IO<Unit> {
ctx.channel().closeFuture().sync()
}
companion object {
- private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+ private val logger = Logger(NettyServerHandle::class)
}
}