From 5e93c1ec9d690d7da15b7c0db0052121d8879471 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 14 Aug 2018 12:52:28 +0200 Subject: Remove Ratpack dependency for HV-VES health checks In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk --- hv-collector-main/pom.xml | 4 -- .../org/onap/dcae/collectors/veshv/main/main.kt | 48 ++++++-------------- .../veshv/main/servers/HealthCheckServer.kt | 39 +++++++++++++++++ .../collectors/veshv/main/servers/ServerStarter.kt | 42 ++++++++++++++++++ .../collectors/veshv/main/servers/VesServer.kt | 51 ++++++++++++++++++++++ 5 files changed, 145 insertions(+), 39 deletions(-) create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt (limited to 'hv-collector-main') diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index 0e956288..af64cedd 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -98,10 +98,6 @@ ${project.parent.version} test - - io.ratpack - ratpack-core - io.arrow-kt arrow-core diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc92228f..a84a39a5 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,13 +19,12 @@ */ package org.onap.dcae.collectors.veshv.main -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle -import org.onap.dcae.collectors.veshv.factory.CollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer +import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync @@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK fun main(args: Array) = ArgVesHvConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startHealthCheckApiServer) - .map(::createServer) - .map { - it.start() - .map(::logServerStarted) - .flatMap(ServerHandle::await) - } + .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> logger.error("Failed to start a server", ex) @@ -53,24 +46,9 @@ fun main(args: Array) = { logger.info("Gentle shutdown") } ) -private fun createServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() - val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, - MicrometerMetrics() - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider) -} - -private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also { - logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}") -} - -private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply { - HealthCheckApiServer(HealthStateProvider.INSTANCE) - .start(healthCheckApiPort) - .unsafeRunSync() - .also { logger.info("Health check api server started on port ${it.bindPort}") } -} +private fun startAndAwaitServers(config: ServerConfiguration) = + IO.monad().binding { + HealthCheckServer.start(config).bind() + VesServer.start(config).bind() + .await().bind() + }.fix() diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt new file mode 100644 index 00000000..04fc021d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +object HealthCheckServer : ServerStarter() { + override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() + + private fun createHealthCheckServer(config: ServerConfiguration) = + HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort) + + override fun serverStartedMessage(handle: ServerHandle) = + "Health check server is up and listening on ${handle.host}:${handle.port}" +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt new file mode 100644 index 00000000..5c6f1277 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +abstract class ServerStarter { + fun start(config: ServerConfiguration): IO = + startServer(config) + .map { logger.info(serverStartedMessage(it)); it } + + protected abstract fun startServer(config: ServerConfiguration): IO + protected abstract fun serverStartedMessage(handle: ServerHandle): String + + companion object { + private val logger = Logger(ServerStarter::class) + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt new file mode 100644 index 00000000..fbf8936f --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +object VesServer : ServerStarter() { + override fun startServer(config: ServerConfiguration): IO = createVesServer(config).start() + + private fun createVesServer(config: ServerConfiguration): Server { + val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() + val collectorProvider = CollectorFactory( + AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + sink, + MicrometerMetrics() + ).createVesHvCollectorProvider() + + return ServerFactory.createNettyTcpServer(config, collectorProvider) + } + + override fun serverStartedMessage(handle: ServerHandle) = + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +} \ No newline at end of file -- cgit 1.2.3-korg