diff options
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index c76233f0..adc629bc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -93,7 +93,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } - return collectorProvider(clientContext).fold( + messageHandlingStream(clientContext, nettyInbound).subscribe() + return nettyOutbound.neverComplete() + } + + private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> = + collectorProvider(clientContext).fold( { logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } Mono.empty() @@ -108,7 +113,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, it.handleConnection(createDataStream(nettyInbound)) } ) - } private fun populateClientContext(clientContext: ClientContext, connection: Connection) { clientContext.clientAddress = try { |