From 5e93c1ec9d690d7da15b7c0db0052121d8879471 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 14 Aug 2018 12:52:28 +0200 Subject: Remove Ratpack dependency for HV-VES health checks In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk --- .../kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt | 6 +----- .../onap/dcae/collectors/veshv/factory/CollectorFactory.kt | 8 ++++---- .../veshv/impl/adapters/ConsulConfigurationProvider.kt | 8 ++++---- .../dcae/collectors/veshv/impl/socket/NettyTcpServer.kt | 14 ++------------ .../veshv/impl/adapters/ConsulConfigurationProviderTest.kt | 10 +++++----- 5 files changed, 16 insertions(+), 30 deletions(-) (limited to 'hv-collector-core') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ff997173..6c256b72 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector interface Server { fun start(): IO } - -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO - abstract fun await(): IO -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3e652b92..a400ff32 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { + private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() @@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, .map(this::createVesHvCollector) .doOnNext { logger.info("Using updated configuration for new connections") - healthStateProvider.changeState(HealthState.HEALTHY) + healthState.changeState(HealthDescription.HEALTHY) } .doOnError { logger.error("Failed to acquire configuration from consul") - healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 81463039..7de28306 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,8 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, - private val healthStateProvider: HealthStateProvider, + private val healthState: HealthState, retrySpec: Retry ) : ConfigurationProvider { @@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) - healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } constructor(http: HttpAdapter, @@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, params.configurationUrl, params.firstRequestDelay, params.requestInterval, - HealthStateProvider.INSTANCE, + HealthState.INSTANCE, Retry.any() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index c28b1510..f858d959 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -24,15 +24,15 @@ import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer import java.time.Duration import java.util.function.BiFunction @@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, return this } - private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { - override fun shutdown() = IO { - ctx.shutdown() - } - - override fun await() = IO { - ctx.context.channel().closeFuture().sync() - } - } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index f9a9ba60..78858921 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({ describe("Consul configuration provider") { val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthStateProvider.INSTANCE + val healthStateProvider = HealthState.INSTANCE given("valid resource url") { val validUrl = "http://valid-url/" @@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({ it("should update the health state"){ StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) .verifyComplete() } } @@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({ private fun constructConsulConfigProvider(url: String, httpAdapter: HttpAdapter, - healthStateProvider: HealthStateProvider, + healthState: HealthState, iterationCount: Long = 1 ): ConsulConfigurationProvider { @@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String, url, firstRequestDelay, requestInterval, - healthStateProvider, + healthState, retry ) } -- cgit 1.2.3-korg