From 302d27926c76bb99eecc4f74d333d0e8ff240c6e Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 2 Apr 2019 15:40:46 +0200 Subject: Fix shutting down when new config received bug When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk --- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 3 +- .../collectors/veshv/factory/CollectorFactory.kt | 5 +- .../dcae/collectors/veshv/impl/HvVesCollector.kt | 121 +++++++++++++++++++++ .../org/onap/dcae/collectors/veshv/impl/Router.kt | 25 +++-- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 121 --------------------- .../veshv/impl/adapters/kafka/KafkaPublisher.kt | 5 +- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 16 ++- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 49 +++++---- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 21 ++-- 9 files changed, 188 insertions(+), 178 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt (limited to 'sources/hv-collector-core/src/main') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ba0a9eee..0039ef62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable @@ -36,5 +35,5 @@ interface CollectorProvider : Closeable { } interface Server { - fun start(): IO + fun start(): Mono } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 1c79abd2..8fb4e80d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration, } private fun createVesHvCollector(ctx: ClientContext): Collector = - VesHvCollector( + HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt new file mode 100644 index 00000000..7d8f0cb1 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -0,0 +1,121 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure +import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MessageEither +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class HvVesCollector( + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, + private val protobufDecoder: VesDecoder, + private val router: Router, + private val metrics: Metrics) : Collector { + + override fun handleConnection(dataStream: Flux): Mono = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::route) + .handleErrors() + .doFinally { releaseBuffersMemory() } + .then() + + private fun decodeWireFrame(flux: Flux): Flux = flux + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } + .concatMap(wireChunkDecoder::decode) + .doOnNext(metrics::notifyMessageReceived) + + private fun filterInvalidWireFrame(flux: Flux): Flux = flux + .filterFailedWithLog { + MessageValidator + .validateFrameMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } + + private fun decodeProtobufPayload(flux: Flux): Flux = flux + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } + + private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux + .filterFailedWithLog { + MessageValidator + .validateProtobufMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } + + private fun route(flux: Flux) = flux + .flatMap(router::route) + .doOnNext(this::updateSinkMetrics) + + private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { + when (consumedMessage) { + is SuccessfullyConsumedMessage -> + metrics.notifyMessageSent(consumedMessage.message) + is FailedToConsumeMessage -> + metrics.notifyMessageDropped(consumedMessage.cause) + } + } + + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } + + private fun Flux.handleErrors(): Flux = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + + private fun Flux.filterFailedWithLog(predicate: (T) -> MessageEither): Flux = + filterFailedWithLog(logger, clientContext::fullMdc, predicate) + + companion object { + private val logger = Logger(HvVesCollector::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty() + } + + private fun routeFound(message: VesMessage, route: Route): Flux { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt deleted file mode 100644 index 618b818f..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause -import org.onap.dcae.collectors.veshv.model.ConsumedMessage -import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage -import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure -import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class VesHvCollector( - private val clientContext: ClientContext, - private val wireChunkDecoder: WireChunkDecoder, - private val protobufDecoder: VesDecoder, - private val router: Router, - private val metrics: Metrics) : Collector { - - override fun handleConnection(dataStream: Flux): Mono = - dataStream - .transform { decodeWireFrame(it) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::route) - .handleErrors() - .doFinally { releaseBuffersMemory() } - .then() - - private fun decodeWireFrame(flux: Flux): Flux = flux - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(wireChunkDecoder::decode) - .doOnNext(metrics::notifyMessageReceived) - - private fun filterInvalidWireFrame(flux: Flux): Flux = flux - .filterFailedWithLog { - MessageValidator - .validateFrameMessage(it) - .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } - } - - private fun decodeProtobufPayload(flux: Flux): Flux = flux - .flatMap { frame -> - protobufDecoder - .decode(frame) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) - } - - private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux - .filterFailedWithLog { - MessageValidator - .validateProtobufMessage(it) - .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } - } - - private fun route(flux: Flux) = flux - .flatMap(router::route) - .doOnNext(this::updateSinkMetrics) - - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { - when (consumedMessage) { - is SuccessfullyConsumedMessage -> - metrics.notifyMessageSent(consumedMessage.message) - is FailedToConsumeMessage -> - metrics.notifyMessageDropped(consumedMessage.cause) - } - } - - private fun releaseBuffersMemory() = wireChunkDecoder.release() - .also { logger.debug { "Released buffer memory after handling message stream" } } - - private fun Flux.handleErrors(): Flux = onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) - } - - private fun Flux.filterFailedWithLog(predicate: (T) -> MessageEither): Flux = - filterFailedWithLog(logger, clientContext::fullMdc, predicate) - - companion object { - private val logger = Logger(VesHvCollector::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 2ce0f42f..7b726ab4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 7a498652..86980832 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender @@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import reactor.kafka.sender.KafkaSender import java.util.Collections.synchronizedMap @@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider { } } - override fun close() = IO { - messageSinks.values.forEach { it.close() } - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } + override fun close(): Mono = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } companion object { private val logger = Logger(KafkaSinkProvider::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3e19414d..a208384a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider @@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { - override fun start(): IO = IO { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .doOnUnbound { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - collectorProvider.close().unsafeRunSync() - } - .let { NettyServerHandle(it.bindNow()) } - } + override fun start(): Mono = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono = + collectorProvider.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + private fun TcpServer.configureSsl() = sslContext @@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = withNewClientContextFrom(nettyInbound, nettyOutbound) { clientContext -> - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + logger.debug(clientContext::fullMdc) { "Client connection request received" } clientContext.clientAddress .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } @@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { metrics.notifyClientConnected() - logger.info(clientContext::fullMdc) { "Handling new client connection" } + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } val collector = collectorProvider(clientContext) return collector.handleClient(clientContext, nettyInbound) } private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink): (WireFrameDecodingError) -> IO = { err -> + private fun onError(next: SynchronousSink): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink): (WireFrameMessage) -> IO = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { -- cgit 1.2.3-korg