diff options
Diffstat (limited to 'sources/hv-collector-main')
4 files changed, 171 insertions, 97 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index ae87f1c2..2311b2ba 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option import arrow.core.fix +import arrow.core.getOrElse import arrow.instances.option.monad.monad import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine @@ -30,7 +31,7 @@ import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.* import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY @@ -45,73 +46,90 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMU import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.utils.commandline.hasOption -import org.onap.dcae.collectors.veshv.utils.commandline.intValue -import org.onap.dcae.collectors.veshv.utils.commandline.longValue -import org.onap.dcae.collectors.veshv.utils.commandline.stringValue +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LOG_LEVEL +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress import java.time.Duration + internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { override val cmdLineOptionsList = listOf( - KAFKA_SERVERS, - HEALTH_CHECK_API_PORT, - LISTEN_PORT, - CONSUL_CONFIG_URL, - CONSUL_FIRST_REQUEST_DELAY, - CONSUL_REQUEST_INTERVAL, - SSL_DISABLE, - KEY_STORE_FILE, - KEY_STORE_PASSWORD, - TRUST_STORE_FILE, - TRUST_STORE_PASSWORD, - IDLE_TIMEOUT_SEC, - MAXIMUM_PAYLOAD_SIZE_BYTES, - DUMMY_MODE + KAFKA_SERVERS, + HEALTH_CHECK_API_PORT, + LISTEN_PORT, + CONSUL_CONFIG_URL, + CONSUL_FIRST_REQUEST_DELAY, + CONSUL_REQUEST_INTERVAL, + SSL_DISABLE, + KEY_STORE_FILE, + KEY_STORE_PASSWORD, + TRUST_STORE_FILE, + TRUST_STORE_PASSWORD, + IDLE_TIMEOUT_SEC, + MAXIMUM_PAYLOAD_SIZE_BYTES, + DUMMY_MODE, + LOG_LEVEL ) override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> = - Option.monad().binding { - val healthCheckApiPort = cmdLine.intValue( - HEALTH_CHECK_API_PORT, - DefaultValues.HEALTH_CHECK_API_PORT - ) - val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() - val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) - val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, - DefaultValues.MAX_PAYLOAD_SIZE_BYTES) - val dummyMode = cmdLine.hasOption(DUMMY_MODE) - val security = createSecurityConfiguration(cmdLine).bind() - val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() - ServerConfiguration( - serverListenAddress = InetSocketAddress(listenPort), - kafkaConfiguration = KafkaConfiguration(kafkaServers), - healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), - configurationProviderParams = configurationProviderParams, - securityConfiguration = security, - idleTimeout = Duration.ofSeconds(idleTimeoutSec), - maximumPayloadSizeBytes = maxPayloadSizeBytes, - dummyMode = dummyMode) - }.fix() + Option.monad().binding { + val healthCheckApiPort = cmdLine.intValue( + HEALTH_CHECK_API_PORT, + DefaultValues.HEALTH_CHECK_API_PORT + ) + val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() + val listenPort = cmdLine.intValue(LISTEN_PORT).bind() + val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) + val maxPayloadSizeBytes = cmdLine.intValue( + MAXIMUM_PAYLOAD_SIZE_BYTES, + DefaultValues.MAX_PAYLOAD_SIZE_BYTES + ) + val dummyMode = cmdLine.hasOption(DUMMY_MODE) + val security = createSecurityConfiguration(cmdLine).bind() + val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL) + val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() + ServerConfiguration( + serverListenAddress = InetSocketAddress(listenPort), + kafkaConfiguration = KafkaConfiguration(kafkaServers), + healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), + configurationProviderParams = configurationProviderParams, + securityConfiguration = security, + idleTimeout = Duration.ofSeconds(idleTimeoutSec), + maximumPayloadSizeBytes = maxPayloadSizeBytes, + dummyMode = dummyMode, + logLevel = determineLogLevel(logLevel) + ) + }.fix() + private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> = - Option.monad().binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() - val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY - ) - val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL - ) - ConfigurationProviderParams( - configUrl, - Duration.ofSeconds(firstRequestDelay), - Duration.ofSeconds(requestInterval) - ) - }.fix() + Option.monad().binding { + val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() + val firstRequestDelay = cmdLine.longValue( + CONSUL_FIRST_REQUEST_DELAY, + DefaultValues.CONSUL_FIRST_REQUEST_DELAY + ) + val requestInterval = cmdLine.longValue( + CONSUL_REQUEST_INTERVAL, + DefaultValues.CONSUL_REQUEST_INTERVAL + ) + ConfigurationProviderParams( + configUrl, + Duration.ofSeconds(firstRequestDelay), + Duration.ofSeconds(requestInterval) + ) + }.fix() + + private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel) + .getOrElse { + logger.warn { + "Failed to parse $logLevel as $LOG_LEVEL command line. " + + "Using default log level (${DefaultValues.LOG_LEVEL})" + } + LogLevel.valueOf(DefaultValues.LOG_LEVEL) + } + internal object DefaultValues { const val HEALTH_CHECK_API_PORT = 6060 @@ -119,5 +137,10 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration const val CONSUL_REQUEST_INTERVAL = 5L const val IDLE_TIMEOUT_SEC = 60L const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES + val LOG_LEVEL = LogLevel.INFO.name + } + + companion object { + private val logger = Logger(ArgVesHvConfiguration::class) } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 5c9566c7..c29c5d16 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -31,25 +31,28 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger -private val logger = Logger("org.onap.dcae.collectors.veshv.main") -private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" +private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv" +private val logger = Logger("$VESHV_PACKAGE.main") +private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" fun main(args: Array<String>) = - ArgVesHvConfiguration().parse(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.info { "Gentle shutdown" } } - ) + ArgVesHvConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map(::startAndAwaitServers) + .unsafeRunEitherSync( + { ex -> + logger.withError { log("Failed to start a server", ex) } + ExitFailure(1) + }, + { logger.info { "Gentle shutdown" } } + ) private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - logger.info { "Using configuration: $config" } - HealthCheckServer.start(config).bind() - VesServer.start(config).bind() - .await().bind() - }.fix() + IO.monad().binding { + Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + HealthCheckServer.start(config).bind() + VesServer.start(config).bind() + .await().bind() + }.fix() + diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index c88b8aa8..b54dc361 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -84,10 +84,6 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/> <logger name="reactor.netty" level="WARN"/> <logger name="io.netty" level="DEBUG"/> <logger name="io.netty.util" level="WARN"/> diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index 9dddeca9..03bf44f1 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import java.time.Duration import kotlin.test.assertNotNull @@ -47,6 +48,7 @@ object ArgVesHvConfigurationTest : Spek({ val listenPort = "6969" val keyStorePassword = "kspass" val trustStorePassword = "tspass" + val logLevel = LogLevel.DEBUG.name beforeEachTest { cut = ArgVesHvConfiguration() @@ -58,16 +60,17 @@ object ArgVesHvConfigurationTest : Spek({ beforeEachTest { result = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--config-url", configurationUrl, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "--health-check-api-port", healthCheckApiPort, + "--listen-port", listenPort, + "--config-url", configurationUrl, + "--first-request-delay", firstRequestDelay, + "--request-interval", requestInterval, + "--key-store", "/tmp/keys.p12", + "--trust-store", "/tmp/trust.p12", + "--key-store-password", keyStorePassword, + "--trust-store-password", trustStorePassword, + "--log-level", logLevel ) } @@ -94,17 +97,17 @@ object ArgVesHvConfigurationTest : Spek({ it("should set proper first consul request delay") { assertThat(result.configurationProviderParams.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) + .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) } it("should set proper consul request interval") { assertThat(result.configurationProviderParams.requestInterval) - .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) + .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) } it("should set proper config url") { assertThat(result.configurationProviderParams.configurationUrl) - .isEqualTo(configurationUrl) + .isEqualTo(configurationUrl) } it("should set proper security configuration") { @@ -116,29 +119,78 @@ object ArgVesHvConfigurationTest : Spek({ assertThat(keys.keyStorePassword).isEqualTo(keyStorePassword.toCharArray()) assertThat(keys.trustStorePassword).isEqualTo(trustStorePassword.toCharArray()) } + + it("should set proper log level") { + assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG) + } } describe("required parameter is absent") { on("missing listen port") { it("should throw exception") { - assertThat(cut.parseExpectingFailure( + assertThat( + cut.parseExpectingFailure( "--config-url", configurationUrl, "--ssl-disable", "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval) + "--request-interval", requestInterval + ) ).isInstanceOf(WrongArgumentError::class.java) } } on("missing configuration url") { it("should throw exception") { - assertThat(cut.parseExpectingFailure( + assertThat( + cut.parseExpectingFailure( "--listen-port", listenPort, "--ssl-disable", "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval) + "--request-interval", requestInterval + ) ).isInstanceOf(WrongArgumentError::class.java) } } } + + describe("correct log level not provided") { + on("missing log level") { + it("should set default INFO value") { + val config = cut.parseExpectingSuccess( + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "--health-check-api-port", healthCheckApiPort, + "--listen-port", listenPort, + "--config-url", configurationUrl, + "--first-request-delay", firstRequestDelay, + "--request-interval", requestInterval, + "--key-store", "/tmp/keys.p12", + "--trust-store", "/tmp/trust.p12", + "--key-store-password", keyStorePassword, + "--trust-store-password", trustStorePassword + ) + + assertThat(config.logLevel).isEqualTo(LogLevel.INFO) + } + } + + on("incorrect log level") { + it("should set default INFO value") { + val config = cut.parseExpectingSuccess( + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "--health-check-api-port", healthCheckApiPort, + "--listen-port", listenPort, + "--config-url", configurationUrl, + "--first-request-delay", firstRequestDelay, + "--request-interval", requestInterval, + "--key-store", "/tmp/keys.p12", + "--trust-store", "/tmp/trust.p12", + "--key-store-password", keyStorePassword, + "--trust-store-password", trustStorePassword, + "--log-level", "1" + ) + + assertThat(config.logLevel).isEqualTo(LogLevel.INFO) + } + } + } } })
\ No newline at end of file |