summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt139
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt113
2 files changed, 133 insertions, 119 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index c97486c6..56bb1d84 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -30,7 +30,9 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.commandline.*
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
@@ -55,80 +57,87 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
- KAFKA_SERVERS,
- HEALTH_CHECK_API_PORT,
- LISTEN_PORT,
- CONSUL_CONFIG_URL,
- CONSUL_FIRST_REQUEST_DELAY,
- CONSUL_REQUEST_INTERVAL,
- SSL_DISABLE,
- KEY_STORE_FILE,
- KEY_STORE_PASSWORD,
- TRUST_STORE_FILE,
- TRUST_STORE_PASSWORD,
- IDLE_TIMEOUT_SEC,
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- DUMMY_MODE,
- LOG_LEVEL
+ KAFKA_SERVERS,
+ HEALTH_CHECK_API_PORT,
+ LISTEN_PORT,
+ CONSUL_CONFIG_URL,
+ CONSUL_FIRST_REQUEST_DELAY,
+ CONSUL_REQUEST_INTERVAL,
+ SSL_DISABLE,
+ KEY_STORE_FILE,
+ KEY_STORE_PASSWORD,
+ TRUST_STORE_FILE,
+ TRUST_STORE_PASSWORD,
+ IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DUMMY_MODE,
+ LOG_LEVEL
)
override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
- Option.monad().binding {
- val healthCheckApiPort = cmdLine.intValue(
- HEALTH_CHECK_API_PORT,
- DefaultValues.HEALTH_CHECK_API_PORT
- )
- val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
- val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
- val maxPayloadSizeBytes = cmdLine.intValue(
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- DefaultValues.MAX_PAYLOAD_SIZE_BYTES
- )
- val dummyMode = cmdLine.hasOption(DUMMY_MODE)
- val security = createSecurityConfiguration(cmdLine).bind()
- val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
- val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
- ServerConfiguration(
- serverListenAddress = InetSocketAddress(listenPort),
- kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
- healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
- configurationProviderParams = configurationProviderParams,
- securityConfiguration = security,
- idleTimeout = Duration.ofSeconds(idleTimeoutSec),
- maximumPayloadSizeBytes = maxPayloadSizeBytes,
- dummyMode = dummyMode,
- logLevel = determineLogLevel(logLevel)
- )
- }.fix()
+ Option.monad().binding {
+ val healthCheckApiPort = cmdLine.intValue(
+ HEALTH_CHECK_API_PORT,
+ DefaultValues.HEALTH_CHECK_API_PORT
+ )
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ DefaultValues.MAX_PAYLOAD_SIZE_BYTES
+ )
+ val dummyMode = cmdLine.hasOption(DUMMY_MODE)
+ val security = createSecurityConfiguration(cmdLine)
+ .doOnFailure { ex ->
+ logger.withError(ServiceContext::mdc) {
+ log("Could not read security keys", ex)
+ }
+ }
+ .toOption()
+ .bind()
+ val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
+ ServerConfiguration(
+ serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
+ healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
+ configurationProviderParams = configurationProviderParams,
+ securityConfiguration = security,
+ idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
+ dummyMode = dummyMode,
+ logLevel = determineLogLevel(logLevel)
+ )
+ }.fix()
private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
- Option.monad().binding {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
- val firstRequestDelay = cmdLine.longValue(
- CONSUL_FIRST_REQUEST_DELAY,
- DefaultValues.CONSUL_FIRST_REQUEST_DELAY
- )
- val requestInterval = cmdLine.longValue(
- CONSUL_REQUEST_INTERVAL,
- DefaultValues.CONSUL_REQUEST_INTERVAL
- )
- ConfigurationProviderParams(
- configUrl,
- Duration.ofSeconds(firstRequestDelay),
- Duration.ofSeconds(requestInterval)
- )
- }.fix()
+ Option.monad().binding {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+ val firstRequestDelay = cmdLine.longValue(
+ CONSUL_FIRST_REQUEST_DELAY,
+ DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ )
+ val requestInterval = cmdLine.longValue(
+ CONSUL_REQUEST_INTERVAL,
+ DefaultValues.CONSUL_REQUEST_INTERVAL
+ )
+ ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }.fix()
private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel)
- .getOrElse {
- logger.warn {
- "Failed to parse $logLevel as $LOG_LEVEL command line. " +
- "Using default log level (${DefaultValues.LOG_LEVEL})"
+ .getOrElse {
+ logger.warn {
+ "Failed to parse $logLevel as $LOG_LEVEL command line. " +
+ "Using default log level (${DefaultValues.LOG_LEVEL})"
+ }
+ LogLevel.valueOf(DefaultValues.LOG_LEVEL)
}
- LogLevel.valueOf(DefaultValues.LOG_LEVEL)
- }
internal object DefaultValues {
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 03bf44f1..90571ce9 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -25,12 +25,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.JdkKeys
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
import java.time.Duration
import kotlin.test.assertNotNull
@@ -60,17 +60,17 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--config-url", configurationUrl,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword,
- "--log-level", logLevel
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword,
+ "--log-level", logLevel
)
}
@@ -97,27 +97,32 @@ object ArgVesHvConfigurationTest : Spek({
it("should set proper first consul request delay") {
assertThat(result.configurationProviderParams.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
+ .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
}
it("should set proper consul request interval") {
assertThat(result.configurationProviderParams.requestInterval)
- .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
+ .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
}
it("should set proper config url") {
assertThat(result.configurationProviderParams.configurationUrl)
- .isEqualTo(configurationUrl)
+ .isEqualTo(configurationUrl)
}
it("should set proper security configuration") {
- assertThat(result.securityConfiguration.sslDisable).isFalse()
+ assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
+
+ val keys = result.securityConfiguration.keys.orNull() as SecurityKeys
+ assertNotNull(keys.keyStore())
+ assertNotNull(keys.trustStore())
+ keys.keyStorePassword().useChecked {
+ assertThat(it).isEqualTo(keyStorePassword.toCharArray())
- val keys = result.securityConfiguration.keys.orNull() as JdkKeys
- assertNotNull(keys.keyStore)
- assertNotNull(keys.trustStore)
- assertThat(keys.keyStorePassword).isEqualTo(keyStorePassword.toCharArray())
- assertThat(keys.trustStorePassword).isEqualTo(trustStorePassword.toCharArray())
+ }
+ keys.trustStorePassword().useChecked {
+ assertThat(it).isEqualTo(trustStorePassword.toCharArray())
+ }
}
it("should set proper log level") {
@@ -129,24 +134,24 @@ object ArgVesHvConfigurationTest : Spek({
on("missing listen port") {
it("should throw exception") {
assertThat(
- cut.parseExpectingFailure(
- "--config-url", configurationUrl,
- "--ssl-disable",
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval
- )
+ cut.parseExpectingFailure(
+ "--config-url", configurationUrl,
+ "--ssl-disable",
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval
+ )
).isInstanceOf(WrongArgumentError::class.java)
}
}
on("missing configuration url") {
it("should throw exception") {
assertThat(
- cut.parseExpectingFailure(
- "--listen-port", listenPort,
- "--ssl-disable",
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval
- )
+ cut.parseExpectingFailure(
+ "--listen-port", listenPort,
+ "--ssl-disable",
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval
+ )
).isInstanceOf(WrongArgumentError::class.java)
}
}
@@ -156,16 +161,16 @@ object ArgVesHvConfigurationTest : Spek({
on("missing log level") {
it("should set default INFO value") {
val config = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--config-url", configurationUrl,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword
)
assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
@@ -175,17 +180,17 @@ object ArgVesHvConfigurationTest : Spek({
on("incorrect log level") {
it("should set default INFO value") {
val config = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--config-url", configurationUrl,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword,
- "--log-level", "1"
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "--health-check-api-port", healthCheckApiPort,
+ "--listen-port", listenPort,
+ "--config-url", configurationUrl,
+ "--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
+ "--key-store", "/tmp/keys.p12",
+ "--trust-store", "/tmp/trust.p12",
+ "--key-store-password", keyStorePassword,
+ "--trust-store-password", trustStorePassword,
+ "--log-level", "1"
)
assertThat(config.logLevel).isEqualTo(LogLevel.INFO)