diff options
Diffstat (limited to 'hv-collector-main')
5 files changed, 145 insertions, 39 deletions
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index 0e956288..af64cedd 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -99,10 +99,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>io.ratpack</groupId> - <artifactId>ratpack-core</artifactId> - </dependency> - <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> </dependency> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc92228f..a84a39a5 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,13 +19,12 @@ */ package org.onap.dcae.collectors.veshv.main -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle -import org.onap.dcae.collectors.veshv.factory.CollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer +import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync @@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK fun main(args: Array<String>) = ArgVesHvConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startHealthCheckApiServer) - .map(::createServer) - .map { - it.start() - .map(::logServerStarted) - .flatMap(ServerHandle::await) - } + .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> logger.error("Failed to start a server", ex) @@ -53,24 +46,9 @@ fun main(args: Array<String>) = { logger.info("Gentle shutdown") } ) -private fun createServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() - val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, - MicrometerMetrics() - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider) -} - -private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also { - logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}") -} - -private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply { - HealthCheckApiServer(HealthStateProvider.INSTANCE) - .start(healthCheckApiPort) - .unsafeRunSync() - .also { logger.info("Health check api server started on port ${it.bindPort}") } -} +private fun startAndAwaitServers(config: ServerConfiguration) = + IO.monad().binding { + HealthCheckServer.start(config).bind() + VesServer.start(config).bind() + .await().bind() + }.fix() diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt new file mode 100644 index 00000000..04fc021d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +object HealthCheckServer : ServerStarter() { + override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() + + private fun createHealthCheckServer(config: ServerConfiguration) = + HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort) + + override fun serverStartedMessage(handle: ServerHandle) = + "Health check server is up and listening on ${handle.host}:${handle.port}" +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt new file mode 100644 index 00000000..5c6f1277 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +abstract class ServerStarter { + fun start(config: ServerConfiguration): IO<ServerHandle> = + startServer(config) + .map { logger.info(serverStartedMessage(it)); it } + + protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> + protected abstract fun serverStartedMessage(handle: ServerHandle): String + + companion object { + private val logger = Logger(ServerStarter::class) + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt new file mode 100644 index 00000000..fbf8936f --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +object VesServer : ServerStarter() { + override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() + + private fun createVesServer(config: ServerConfiguration): Server { + val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() + val collectorProvider = CollectorFactory( + AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + sink, + MicrometerMetrics() + ).createVesHvCollectorProvider() + + return ServerFactory.createNettyTcpServer(config, collectorProvider) + } + + override fun serverStartedMessage(handle: ServerHandle) = + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +}
\ No newline at end of file |