From 302d27926c76bb99eecc4f74d333d0e8ff240c6e Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 2 Apr 2019 15:40:46 +0200 Subject: Fix shutting down when new config received bug When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk --- development/configuration/local.json | 20 ++++ sources/hv-collector-commandline/pom.xml | 4 - sources/hv-collector-core/pom.xml | 4 - .../org/onap/dcae/collectors/veshv/boundary/api.kt | 3 +- .../collectors/veshv/factory/CollectorFactory.kt | 5 +- .../dcae/collectors/veshv/impl/HvVesCollector.kt | 121 +++++++++++++++++++++ .../org/onap/dcae/collectors/veshv/impl/Router.kt | 25 +++-- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 121 --------------------- .../veshv/impl/adapters/kafka/KafkaPublisher.kt | 5 +- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 16 ++- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 49 +++++---- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 21 ++-- .../dcae/collectors/veshv/tests/component/Sut.kt | 3 +- .../veshv/tests/component/VesHvSpecification.kt | 4 +- .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 10 +- sources/hv-collector-main/pom.xml | 4 - .../org/onap/dcae/collectors/veshv/main/main.kt | 29 +++-- .../collectors/veshv/main/servers/VesServer.kt | 6 +- .../src/main/resources/logback.xml | 1 + .../onap/dcae/collectors/veshv/main/MainTest.kt | 9 +- .../onap/dcae/collectors/veshv/utils/Closeable.kt | 12 +- .../dcae/collectors/veshv/utils/server_handle.kt | 30 ++++- 22 files changed, 270 insertions(+), 232 deletions(-) create mode 100644 development/configuration/local.json create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt diff --git a/development/configuration/local.json b/development/configuration/local.json new file mode 100644 index 00000000..a1a8b533 --- /dev/null +++ b/development/configuration/local.json @@ -0,0 +1,20 @@ +{ + "logLevel": "DEBUG", + "server": { + "listenPort": 8061, + "idleTimeoutSec": 60, + "maxPayloadSizeBytes": 1048576 + }, + "cbs": { + "firstRequestDelaySec": 10, + "requestIntervalSec": 5 + }, + "security": { + "keys": { + "keyStoreFile": "development/ssl/server.p12", + "keyStorePassword": "onaponap", + "trustStoreFile": "development/ssl/trust.p12", + "trustStorePassword": "onaponap" + } + } +} \ No newline at end of file diff --git a/sources/hv-collector-commandline/pom.xml b/sources/hv-collector-commandline/pom.xml index 7f8643de..078a3cb5 100644 --- a/sources/hv-collector-commandline/pom.xml +++ b/sources/hv-collector-commandline/pom.xml @@ -40,10 +40,6 @@ org.jetbrains.kotlin kotlin-reflect - - io.arrow-kt - arrow-effects - org.assertj assertj-core diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e7134e18..e15592f3 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -95,10 +95,6 @@ org.jetbrains.kotlin kotlin-reflect - - io.arrow-kt - arrow-effects - io.arrow-kt arrow-core diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ba0a9eee..0039ef62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable @@ -36,5 +35,5 @@ interface CollectorProvider : Closeable { } interface Server { - fun start(): IO + fun start(): Mono } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 1c79abd2..8fb4e80d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration, } private fun createVesHvCollector(ctx: ClientContext): Collector = - VesHvCollector( + HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt new file mode 100644 index 00000000..7d8f0cb1 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -0,0 +1,121 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ClientRejectionCause +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure +import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MessageEither +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class HvVesCollector( + private val clientContext: ClientContext, + private val wireChunkDecoder: WireChunkDecoder, + private val protobufDecoder: VesDecoder, + private val router: Router, + private val metrics: Metrics) : Collector { + + override fun handleConnection(dataStream: Flux): Mono = + dataStream + .transform { decodeWireFrame(it) } + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) + .transform(::route) + .handleErrors() + .doFinally { releaseBuffersMemory() } + .then() + + private fun decodeWireFrame(flux: Flux): Flux = flux + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } + .concatMap(wireChunkDecoder::decode) + .doOnNext(metrics::notifyMessageReceived) + + private fun filterInvalidWireFrame(flux: Flux): Flux = flux + .filterFailedWithLog { + MessageValidator + .validateFrameMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } + + private fun decodeProtobufPayload(flux: Flux): Flux = flux + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } + + private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux + .filterFailedWithLog { + MessageValidator + .validateProtobufMessage(it) + .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } + } + + private fun route(flux: Flux) = flux + .flatMap(router::route) + .doOnNext(this::updateSinkMetrics) + + private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { + when (consumedMessage) { + is SuccessfullyConsumedMessage -> + metrics.notifyMessageSent(consumedMessage.message) + is FailedToConsumeMessage -> + metrics.notifyMessageDropped(consumedMessage.cause) + } + } + + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } + + private fun Flux.handleErrors(): Flux = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + + private fun Flux.filterFailedWithLog(predicate: (T) -> MessageEither): Flux = + filterFailedWithLog(logger, clientContext::fullMdc, predicate) + + companion object { + private val logger = Logger(HvVesCollector::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty() + } + + private fun routeFound(message: VesMessage, route: Route): Flux { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt deleted file mode 100644 index 618b818f..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ /dev/null @@ -1,121 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ClientRejectionCause -import org.onap.dcae.collectors.veshv.model.ConsumedMessage -import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage -import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure -import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class VesHvCollector( - private val clientContext: ClientContext, - private val wireChunkDecoder: WireChunkDecoder, - private val protobufDecoder: VesDecoder, - private val router: Router, - private val metrics: Metrics) : Collector { - - override fun handleConnection(dataStream: Flux): Mono = - dataStream - .transform { decodeWireFrame(it) } - .transform(::filterInvalidWireFrame) - .transform(::decodeProtobufPayload) - .transform(::filterInvalidProtobufMessages) - .transform(::route) - .handleErrors() - .doFinally { releaseBuffersMemory() } - .then() - - private fun decodeWireFrame(flux: Flux): Flux = flux - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(wireChunkDecoder::decode) - .doOnNext(metrics::notifyMessageReceived) - - private fun filterInvalidWireFrame(flux: Flux): Flux = flux - .filterFailedWithLog { - MessageValidator - .validateFrameMessage(it) - .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } - } - - private fun decodeProtobufPayload(flux: Flux): Flux = flux - .flatMap { frame -> - protobufDecoder - .decode(frame) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) - } - - private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux - .filterFailedWithLog { - MessageValidator - .validateProtobufMessage(it) - .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } - } - - private fun route(flux: Flux) = flux - .flatMap(router::route) - .doOnNext(this::updateSinkMetrics) - - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { - when (consumedMessage) { - is SuccessfullyConsumedMessage -> - metrics.notifyMessageSent(consumedMessage.message) - is FailedToConsumeMessage -> - metrics.notifyMessageDropped(consumedMessage.cause) - } - } - - private fun releaseBuffersMemory() = wireChunkDecoder.release() - .also { logger.debug { "Released buffer memory after handling message stream" } } - - private fun Flux.handleErrors(): Flux = onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) - } - - private fun Flux.filterFailedWithLog(predicate: (T) -> MessageEither): Flux = - filterFailedWithLog(logger, clientContext::fullMdc, predicate) - - companion object { - private val logger = Logger(VesHvCollector::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 2ce0f42f..7b726ab4 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -19,16 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 7a498652..86980832 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender @@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import reactor.kafka.sender.KafkaSender import java.util.Collections.synchronizedMap @@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider { } } - override fun close() = IO { - messageSinks.values.forEach { it.close() } - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } + override fun close(): Mono = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } companion object { private val logger = Logger(KafkaSinkProvider::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 3e19414d..a208384a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket import arrow.core.Option import arrow.core.getOrElse -import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider @@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { - override fun start(): IO = IO { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .doOnUnbound { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - collectorProvider.close().unsafeRunSync() - } - .let { NettyServerHandle(it.bindNow()) } - } + override fun start(): Mono = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono = + collectorProvider.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + private fun TcpServer.configureSsl() = sslContext @@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = withNewClientContextFrom(nettyInbound, nettyOutbound) { clientContext -> - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + logger.debug(clientContext::fullMdc) { "Client connection request received" } clientContext.clientAddress .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } @@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { metrics.notifyClientConnected() - logger.info(clientContext::fullMdc) { "Handling new client connection" } + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } val collector = collectorProvider(clientContext) return collector.handleClient(clientContext, nettyInbound) } private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b735138d..ca9d28ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.wire -import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer import reactor.core.publisher.SynchronousSink @@ -63,26 +62,22 @@ internal class WireChunkDecoder( private fun generateFrames(): Flux = Flux.generate { next -> decoder.decodeFirst(streamBuffer) .fold(onError(next), onSuccess(next)) - .unsafeRunSync() } - private fun onError(next: SynchronousSink): (WireFrameDecodingError) -> IO = { err -> + private fun onError(next: SynchronousSink): (WireFrameDecodingError) -> Unit = { err -> when (err) { - is InvalidWireFrame -> IO { + is InvalidWireFrame -> next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { + is MissingWireFrameBytes -> { logEndOfData() next.complete() } } } - private fun onSuccess(next: SynchronousSink): (WireFrameMessage) -> IO = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } + private fun onSuccess(next: SynchronousSink): (WireFrameMessage) -> Unit = { frame -> + logDecodedWireMessage(frame) + next.next(frame) } private fun logIncomingMessage(wire: ByteBuf) { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index f79c2e46..95b9159e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { if (sinkInitialized.get()) { sink.close() } else { - IO.unit + Mono.empty() } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2430c74f..d845f7c4 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -67,7 +67,7 @@ object VesHvSpecification : Spek({ // just connecting should not create sink sut.handleConnection() - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isFalse() @@ -80,7 +80,7 @@ object VesHvSpecification : Spek({ sut.handleConnection(vesWireFrameMessage(PERF3GPP)) // when - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isTrue() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 160defdb..f1b1ba2d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque @@ -50,12 +51,9 @@ class StoringSink : Sink { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } - /* - * TOD0: if the code would look like: - * ```IO { active.set(false) }``` - * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) - */ - override fun close() = active.set(false).run { IO.unit } + override fun close(): Mono = Mono.fromRunnable { + active.set(false) + } } /** diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index edbdaa36..57f21a66 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -95,10 +95,6 @@ io.arrow-kt arrow-core - - io.arrow-kt - arrow-effects-instances - io.arrow-kt arrow-syntax diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc207ef8..8b0a38bb 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import java.time.Duration import java.util.concurrent.atomic.AtomicReference @@ -39,6 +41,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference() private val configurationModule = ConfigurationModule() +private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array) { val configStateListener = object : ConfigurationStateListener { @@ -60,30 +63,36 @@ fun main(args: Array) { logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } - .doOnNext(::startServer) + .flatMap(::startServer) .doOnError(::logServerStartFailed) .then() .block() } -private fun startServer(config: HvVesConfiguration) { - stopRunningServer() +private fun startServer(config: HvVesConfiguration): Mono = + stopRunningServer() + .timeout(maxCloseTime) + .then(deferredVesServer(config)) + .doOnNext { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } + +private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - - VesServer.start(config).let { - registerShutdownHook { shutdownGracefully(it) } - hvVesServer.set(it) - } + VesServer.start(config) } -private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() +private fun stopRunningServer() = Mono.defer { + hvVesServer.get()?.close() ?: Mono.empty() +} internal fun shutdownGracefully(serverHandle: ServerHandle, healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - serverHandle.close().unsafeRunSync() + serverHandle.close().block(maxCloseTime) logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index c079cc59..fc4d8662 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk @@ -38,11 +39,10 @@ object VesServer { private val logger = Logger(VesServer::class) - fun start(config: HvVesConfiguration): ServerHandle = + fun start(config: HvVesConfiguration): Mono = createVesServer(config) .start() - .then(::logServerStarted) - .unsafeRunSync() + .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = initializeCollectorFactory(config) diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 21c1fa31..539f7c2c 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -91,6 +91,7 @@ + diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index d8de9f25..a967fba0 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.inOrder import com.nhaarman.mockitokotlin2.mock @@ -34,6 +33,7 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk @@ -42,12 +42,9 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle = mock() + val handle: ServerHandle = mock() var closed = false - val handleClose = IO { - closed = true - } - whenever(handle.close()).thenReturn(handleClose) + whenever(handle.close()).thenReturn(Mono.empty().doOnSuccess { closed = true }) val healthState: HealthState = mock() on("shutdownGracefully") { diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt index 00b814cc..ec654b32 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/Closeable.kt @@ -19,22 +19,18 @@ */ package org.onap.dcae.collectors.veshv.utils -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.binding +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk * @since January 2019 */ interface Closeable { - fun close(): IO = IO.unit + fun close(): Mono = Mono.empty() companion object { fun closeAll(closeables: Iterable) = - IO.monadError().binding { - closeables.forEach { it.close().bind() } - }.fix() + Flux.fromIterable(closeables).flatMap(Closeable::close).then() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index 5b582ed5..670ab4ac 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,8 +20,9 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import reactor.netty.DisposableServer -import java.time.Duration /** * @author Piotr Jaszczyk @@ -35,16 +36,33 @@ abstract class ServerHandle(val host: String, val port: Int) : Closeable { * @author Piotr Jaszczyk * @since August 2018 */ -class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { - override fun close() = IO { - ctx.disposeNow(SHUTDOWN_TIMEOUT) - } +class NettyServerHandle(private val ctx: DisposableServer, + private val closeAction: Mono = Mono.empty()) + : ServerHandle(ctx.host(), ctx.port()) { + + override fun close(): Mono = + Mono.just(ctx) + .filter { !it.isDisposed } + .flatMap { + closeAction.thenReturn(it) + } + .then(dispose()) + + private fun dispose(): Mono = + Mono.create { callback -> + logger.debug { "About to dispose NettyServer" } + ctx.dispose() + ctx.onDispose { + logger.debug { "Netty server disposed" } + callback.success() + } + } override fun await() = IO { ctx.channel().closeFuture().sync() } companion object { - private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10) + private val logger = Logger(NettyServerHandle::class) } } -- cgit 1.2.3-korg