diff options
Diffstat (limited to 'sources/hv-collector-main/src/main')
4 files changed, 95 insertions, 105 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 39fcae21..c8a3c013 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,58 +19,58 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monad.monad -import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.neverComplete import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.scheduler.Schedulers +import java.util.concurrent.atomic.AtomicReference -private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv" -private val logger = Logger("$VESHV_PACKAGE.main") -private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" -fun main(args: Array<String>) = - ConfigurationModule() - .createConfigurationFromCommandLine(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } - ) +private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv" +private val logger = Logger("$VES_HV_PACKAGE.main") -private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } +private val hvVesServer = AtomicReference<ServerHandle>() - val healthCheckServerHandle = HealthCheckServer.start(config).bind() - val hvVesHandle = VesServer.start(config).bind() +fun main(args: Array<String>) { + HealthCheckServer.start() + ConfigurationModule() + .hvVesConfigurationUpdates(args) + .publishOn(Schedulers.single(Schedulers.elastic())) + .doOnNext(::startServer) + .doOnError(::logServerStartFailed) + .neverComplete() // TODO: remove after merging configuration stream with cbs + .block() +} + +private fun startServer(config: HvVesConfiguration) { + stopRunningServer() + Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + + VesServer.start(config).let { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } +} - registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) - hvVesHandle.await().bind() - }.fix() +private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() -internal fun closeServers(vararg handles: ServerHandle, - healthState: HealthState = HealthState.INSTANCE) = { +internal fun shutdownGracefully(serverHandle: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).unsafeRunSync() + serverHandle.close().unsafeRunSync() logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } + +private fun logServerStartFailed(ex: Throwable) = + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index 15472b5e..bc284d08 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -19,25 +19,38 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -object HealthCheckServer : ServerStarter() { - override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() +object HealthCheckServer { - private fun createHealthCheckServer(config: ServerConfiguration) = + private const val DEFAULT_HEALTHCHECK_PORT = 6060 + private val logger = Logger(HealthCheckServer::class) + + fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) = + createHealthCheckServer(port) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createHealthCheckServer(listenPort: Int) = HealthCheckApiServer( HealthState.INSTANCE, MicrometerMetrics.INSTANCE.metricsProvider, - config.healthCheckApiListenAddress) + InetSocketAddress(listenPort)) - override fun serverStartedMessage(handle: ServerHandle) = - "Health check server is up and listening on ${handle.host}:${handle.port}" + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "Health check server is up and listening on ${handle.host}:${handle.port}" + } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt deleted file mode 100644 index 74a66324..00000000 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main.servers - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.then -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -abstract class ServerStarter { - fun start(config: ServerConfiguration): IO<ServerHandle> = - startServer(config) - .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } - - protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> - protected abstract fun serverStartedMessage(handle: ServerHandle): String - - companion object { - private val logger = Logger(ServerStarter::class) - } -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 0f5e45ec..d15dccef 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -19,33 +19,54 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -object VesServer : ServerStarter() { - override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() - - private fun createVesServer(config: ServerConfiguration): Server { - val collectorProvider = CollectorFactory( - AdapterFactory.configurationProvider(config.configurationProviderParams), - AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), - MicrometerMetrics.INSTANCE, - config.maximumPayloadSizeBytes - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE) - } - - override fun serverStartedMessage(handle: ServerHandle) = - "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +object VesServer { + + private val logger = Logger(VesServer::class) + + fun start(config: HvVesConfiguration): ServerHandle = + createVesServer(config) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createVesServer(config: HvVesConfiguration): Server = + initializeCollectorFactory(config) + .createVesHvCollectorProvider() + .let { collectorProvider -> + ServerFactory.createNettyTcpServer( + config.server, + config.security, + collectorProvider, + MicrometerMetrics.INSTANCE + ) + } + + private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = + CollectorFactory( + AdapterFactory.configurationProvider(config.cbs), + AdapterFactory.sinkCreatorFactory(config.collector), + MicrometerMetrics.INSTANCE, + config.server.maxPayloadSizeBytes + ) + + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" + } + } |