aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-02 15:40:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-03 08:51:03 +0200
commit302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch)
treec9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-core/src/main
parent6a00e38550fd1745c3377da2099bf5a615f69053 (diff)
Fix shutting down when new config received bug
When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt)4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt49
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
8 files changed, 69 insertions, 59 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ba0a9eee..0039ef62 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -36,5 +35,5 @@ interface CollectorProvider : Closeable {
}
interface Server {
- fun start(): IO<ServerHandle>
+ fun start(): Mono<ServerHandle>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 1c79abd2..8fb4e80d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration,
}
private fun createVesHvCollector(ctx: ClientContext): Collector =
- VesHvCollector(
+ HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index 618b818f..7d8f0cb1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -44,7 +44,7 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class VesHvCollector(
+internal class HvVesCollector(
private val clientContext: ClientContext,
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
@@ -116,6 +116,6 @@ internal class VesHvCollector(
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
- private val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(HvVesCollector::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index 6e2e20f7..b03b89e1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .fold({
- metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
- logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
- logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
- Flux.empty<Route>()
- }, {
- logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
- Flux.just(it)
- })
+ .fold({ routeNotFound(message) }, { routeFound(message, it) })
.flatMap {
val sinkTopic = it.sink.topicName()
messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
}
+ private fun routeNotFound(message: VesMessage): Flux<Route> {
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+ return Flux.empty<Route>()
+ }
+
+ private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+ logger.trace(ctx::fullMdc) {
+ "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+ }
+ return Flux.just(route)
+ }
+
+
private fun routeFor(header: CommonEventHeader) =
routing.find { it.domain == header.domain }.toOption()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 2ce0f42f..7b726ab4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 7a498652..86980832 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
@@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import reactor.kafka.sender.KafkaSender
import java.util.Collections.synchronizedMap
@@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider {
}
}
- override fun close() = IO {
- messageSinks.values.forEach { it.close() }
- logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
- }
+ override fun close(): Mono<Void> =
+ Flux.fromIterable(messageSinks.values)
+ .publishOn(Schedulers.elastic())
+ .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close)
+ .then()
+ .doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
+ }
companion object {
private val logger = Logger(KafkaSinkProvider::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 3e19414d..a208384a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
@@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
- override fun start(): IO<ServerHandle> = IO {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .doOnUnbound {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- collectorProvider.close().unsafeRunSync()
- }
- .let { NettyServerHandle(it.bindNow()) }
- }
+ override fun start(): Mono<ServerHandle> =
+ Mono.defer {
+ TcpServer.create()
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+ .configureSsl()
+ .handle(this::handleConnection)
+ .bind()
+ .map {
+ NettyServerHandle(it, closeAction())
+ }
+ }
+
+ private fun closeAction(): Mono<Void> =
+ collectorProvider.close().doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ }
+
private fun TcpServer.configureSsl() =
sslContext
@@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
withNewClientContextFrom(nettyInbound, nettyOutbound)
{ clientContext ->
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc) { "Client connection request received" }
clientContext.clientAddress
.map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
@@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
val collector = collectorProvider(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
+ nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ connection
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ handleConnection(nettyInbound.createDataStream())
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index b735138d..ca9d28ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -19,17 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.wire
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
- .unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
when (err) {
- is InvalidWireFrame -> IO {
+ is InvalidWireFrame ->
next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
+ is MissingWireFrameBytes -> {
logEndOfData()
next.complete()
}
}
}
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
private fun logIncomingMessage(wire: ByteBuf) {