diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-02-28 17:33:02 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-11 14:51:58 +0100 |
commit | 1422bed4c1a002e663fd7c2c4c204831e5f7aa9a (patch) | |
tree | 18247d0af6cb8aa615934969c24249227e07d9a2 /sources/hv-collector-main/src/main | |
parent | cc581b7f4833fe47ad3eb846d432ca0ffa1c66f4 (diff) |
Use CBS by means of SDK in place of Consul
- changed IO creation in main to fix error with too early calling
changeState method on collector HealthState
- increased coverage a little with few tests
- corrected coverage-report pom file to include new modules
- temporarily changed to 1.1.4-SNAPSHOT version of sdk due to need
of new version of CBSLookup
Change-Id: Ic73b46cf881ab4fabf52bef0327b09082aa90dc6
Issue-ID: DCAEGEN2-1302
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-main/src/main')
3 files changed, 27 insertions, 30 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 99ec5e1e..bb484cfe 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,8 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC @@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, - CONSUL_CONFIG_URL, - CONSUL_FIRST_REQUEST_DELAY, - CONSUL_REQUEST_INTERVAL, + CONFIGURATION_FIRST_REQUEST_DELAY, + CONFIGURATION_REQUEST_INTERVAL, SSL_DISABLE, KEY_STORE_FILE, KEY_STORE_PASSWORD, @@ -117,17 +115,15 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> = Option.monad().binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY + CONFIGURATION_FIRST_REQUEST_DELAY, + DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY ) val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL + CONFIGURATION_REQUEST_INTERVAL, + DefaultValues.CONFIGURATION_REQUEST_INTERVAL ) ConfigurationProviderParams( - configUrl, Duration.ofSeconds(firstRequestDelay), Duration.ofSeconds(requestInterval) ) @@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration internal object DefaultValues { const val HEALTH_CHECK_API_PORT = 6060 - const val CONSUL_FIRST_REQUEST_DELAY = 10L - const val CONSUL_REQUEST_INTERVAL = 5L + const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L + const val CONFIGURATION_REQUEST_INTERVAL = 5L const val IDLE_TIMEOUT_SEC = 60L const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES val LOG_LEVEL = LogLevel.INFO.name diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index f00b9ee4..d21b490c 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad -import arrow.effects.instances.io.monadError.monadError import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription @@ -51,23 +50,25 @@ fun main(args: Array<String>) = logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.debug(ServiceContext::mdc) { "Finished" } } + { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() - VesServer.start(config).bind().let { handle -> - registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() - handle.await().bind() - } - }.fix() + val hvVesHandle = VesServer.start(config).bind() -internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO<Unit> = - IO.monadError().binding { - healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).bind() - logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } + registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) + hvVesHandle.await().bind() }.fix() + +internal fun closeServers(vararg handles: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) = { + logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } + healthState.changeState(HealthDescription.SHUTTING_DOWN) + Closeable.closeAll(handles.asIterable()).unsafeRunSync() + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 4e2e6d86..62c24308 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ object VesServer : ServerStarter() { private fun createVesServer(config: ServerConfiguration): Server { val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + AdapterFactory.configurationProvider(config.configurationProviderParams), AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes |