aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main/src')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt139
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt39
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml4
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt86
4 files changed, 171 insertions, 97 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index ae87f1c2..2311b2ba 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Option
import arrow.core.fix
+import arrow.core.getOrElse
import arrow.instances.option.monad.monad
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
@@ -30,7 +31,7 @@ import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.*
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
@@ -45,73 +46,90 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMU
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.utils.commandline.hasOption
-import org.onap.dcae.collectors.veshv.utils.commandline.intValue
-import org.onap.dcae.collectors.veshv.utils.commandline.longValue
-import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LOG_LEVEL
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.net.InetSocketAddress
import java.time.Duration
+
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
- KAFKA_SERVERS,
- HEALTH_CHECK_API_PORT,
- LISTEN_PORT,
- CONSUL_CONFIG_URL,
- CONSUL_FIRST_REQUEST_DELAY,
- CONSUL_REQUEST_INTERVAL,
- SSL_DISABLE,
- KEY_STORE_FILE,
- KEY_STORE_PASSWORD,
- TRUST_STORE_FILE,
- TRUST_STORE_PASSWORD,
- IDLE_TIMEOUT_SEC,
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- DUMMY_MODE
+ KAFKA_SERVERS,
+ HEALTH_CHECK_API_PORT,
+ LISTEN_PORT,
+ CONSUL_CONFIG_URL,
+ CONSUL_FIRST_REQUEST_DELAY,
+ CONSUL_REQUEST_INTERVAL,
+ SSL_DISABLE,
+ KEY_STORE_FILE,
+ KEY_STORE_PASSWORD,
+ TRUST_STORE_FILE,
+ TRUST_STORE_PASSWORD,
+ IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DUMMY_MODE,
+ LOG_LEVEL
)
override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
- Option.monad().binding {
- val healthCheckApiPort = cmdLine.intValue(
- HEALTH_CHECK_API_PORT,
- DefaultValues.HEALTH_CHECK_API_PORT
- )
- val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
- val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
- val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
- DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
- val dummyMode = cmdLine.hasOption(DUMMY_MODE)
- val security = createSecurityConfiguration(cmdLine).bind()
- val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
- ServerConfiguration(
- serverListenAddress = InetSocketAddress(listenPort),
- kafkaConfiguration = KafkaConfiguration(kafkaServers),
- healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
- configurationProviderParams = configurationProviderParams,
- securityConfiguration = security,
- idleTimeout = Duration.ofSeconds(idleTimeoutSec),
- maximumPayloadSizeBytes = maxPayloadSizeBytes,
- dummyMode = dummyMode)
- }.fix()
+ Option.monad().binding {
+ val healthCheckApiPort = cmdLine.intValue(
+ HEALTH_CHECK_API_PORT,
+ DefaultValues.HEALTH_CHECK_API_PORT
+ )
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DefaultValues.MAX_PAYLOAD_SIZE_BYTES
+ )
+ val dummyMode = cmdLine.hasOption(DUMMY_MODE)
+ val security = createSecurityConfiguration(cmdLine).bind()
+ val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
+ ServerConfiguration(
+ serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers),
+ healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
+ configurationProviderParams = configurationProviderParams,
+ securityConfiguration = security,
+ idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
+ dummyMode = dummyMode,
+ logLevel = determineLogLevel(logLevel)
+ )
+ }.fix()
+
private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
- Option.monad().binding {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
- val firstRequestDelay = cmdLine.longValue(
- CONSUL_FIRST_REQUEST_DELAY,
- DefaultValues.CONSUL_FIRST_REQUEST_DELAY
- )
- val requestInterval = cmdLine.longValue(
- CONSUL_REQUEST_INTERVAL,
- DefaultValues.CONSUL_REQUEST_INTERVAL
- )
- ConfigurationProviderParams(
- configUrl,
- Duration.ofSeconds(firstRequestDelay),
- Duration.ofSeconds(requestInterval)
- )
- }.fix()
+ Option.monad().binding {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+ val firstRequestDelay = cmdLine.longValue(
+ CONSUL_FIRST_REQUEST_DELAY,
+ DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ )
+ val requestInterval = cmdLine.longValue(
+ CONSUL_REQUEST_INTERVAL,
+ DefaultValues.CONSUL_REQUEST_INTERVAL
+ )
+ ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }.fix()
+
+ private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel)
+ .getOrElse {
+ logger.warn {
+ "Failed to parse $logLevel as $LOG_LEVEL command line. " +
+ "Using default log level (${DefaultValues.LOG_LEVEL})"
+ }
+ LogLevel.valueOf(DefaultValues.LOG_LEVEL)
+ }
+
internal object DefaultValues {
const val HEALTH_CHECK_API_PORT = 6060
@@ -119,5 +137,10 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
const val CONSUL_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
+ val LOG_LEVEL = LogLevel.INFO.name
+ }
+
+ companion object {
+ private val logger = Logger(ArgVesHvConfiguration::class)
}
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 5c9566c7..c29c5d16 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -31,25 +31,28 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-private val logger = Logger("org.onap.dcae.collectors.veshv.main")
-private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
+private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
+private val logger = Logger("$VESHV_PACKAGE.main")
+private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
fun main(args: Array<String>) =
- ArgVesHvConfiguration().parse(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.info { "Gentle shutdown" } }
- )
+ ArgVesHvConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startAndAwaitServers)
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.withError { log("Failed to start a server", ex) }
+ ExitFailure(1)
+ },
+ { logger.info { "Gentle shutdown" } }
+ )
private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- logger.info { "Using configuration: $config" }
- HealthCheckServer.start(config).bind()
- VesServer.start(config).bind()
- .await().bind()
- }.fix()
+ IO.monad().binding {
+ Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+ HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind()
+ .await().bind()
+ }.fix()
+
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index c88b8aa8..b54dc361 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -84,10 +84,6 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
<logger name="reactor.netty" level="WARN"/>
<logger name="io.netty" level="DEBUG"/>
<logger name="io.netty.util" level="WARN"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 9dddeca9..03bf44f1 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import java.time.Duration
import kotlin.test.assertNotNull
@@ -47,6 +48,7 @@ object ArgVesHvConfigurationTest : Spek({
val listenPort = "6969"
val keyStorePassword = "kspass"
val trustStorePassword = "tspass"
+ val logLevel = LogLevel.DEBUG.name
beforeEachTest {
cut = ArgVesHvConfiguration()
@@ -58,16 +60,17 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--config-url", configurationUrl,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword,
+ "--log-level", logLevel
)
}
@@ -94,17 +97,17 @@ object ArgVesHvConfigurationTest : Spek({
it("should set proper first consul request delay") {
assertThat(result.configurationProviderParams.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
+ .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
}
it("should set proper consul request interval") {
assertThat(result.configurationProviderParams.requestInterval)
- .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
+ .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
}
it("should set proper config url") {
assertThat(result.configurationProviderParams.configurationUrl)
- .isEqualTo(configurationUrl)
+ .isEqualTo(configurationUrl)
}
it("should set proper security configuration") {
@@ -116,29 +119,78 @@ object ArgVesHvConfigurationTest : Spek({
assertThat(keys.keyStorePassword).isEqualTo(keyStorePassword.toCharArray())
assertThat(keys.trustStorePassword).isEqualTo(trustStorePassword.toCharArray())
}
+
+ it("should set proper log level") {
+ assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG)
+ }
}
describe("required parameter is absent") {
on("missing listen port") {
it("should throw exception") {
- assertThat(cut.parseExpectingFailure(
+ assertThat(
+ cut.parseExpectingFailure(
"--config-url", configurationUrl,
"--ssl-disable",
"--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval)
+ "--request-interval", requestInterval
+ )
).isInstanceOf(WrongArgumentError::class.java)
}
}
on("missing configuration url") {
it("should throw exception") {
- assertThat(cut.parseExpectingFailure(
+ assertThat(
+ cut.parseExpectingFailure(
"--listen-port", listenPort,
"--ssl-disable",
"--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval)
+ "--request-interval", requestInterval
+ )
).isInstanceOf(WrongArgumentError::class.java)
}
}
}
+
+ describe("correct log level not provided") {
+ on("missing log level") {
+ it("should set default INFO value") {
+ val config = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword
+ )
+
+ assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
+ }
+ }
+
+ on("incorrect log level") {
+ it("should set default INFO value") {
+ val config = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword,
+ "--log-level", "1"
+ )
+
+ assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
+ }
+ }
+ }
}
}) \ No newline at end of file