diff options
Diffstat (limited to 'sources')
46 files changed, 914 insertions, 868 deletions
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt index 1e45c923..93d42f96 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt @@ -33,25 +33,24 @@ import java.nio.file.Paths abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) { abstract val cmdLineOptionsList: List<CommandLineOption> - fun parse(args: Array<out String>): Either<WrongArgumentError, T> { - val parseResult = Try { - val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption) - parser.parse(commandLineOptions, args) - } - return parseResult - .toEither() - .mapLeft { ex -> WrongArgumentError(ex, cmdLineOptionsList) } - .map(this::getConfiguration) - .flatMap { - it.toEither { - WrongArgumentError( - message = "Unexpected error when parsing command line arguments", - cmdLineOptionsList = cmdLineOptionsList) - } - } - } - protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T> - protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI()) + fun parse(args: Array<out String>): Either<WrongArgumentError, T> = + Try { parseArgumentsArray(args) } + .toEither() + .mapLeft { WrongArgumentError(it, cmdLineOptionsList) } + .map(this::getConfiguration) + .flatMap { + it.toEither { + WrongArgumentError( + message = "Unexpected error when parsing command line arguments", + cmdLineOptionsList = cmdLineOptionsList) + } + } + + private fun parseArgumentsArray(args: Array<out String>) = + cmdLineOptionsList + .map { it.option } + .fold(Options(), Options::addOption) + .let { parser.parse(it, args) } } diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index 31849215..1c1a355b 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -24,78 +24,72 @@ import org.apache.commons.cli.Option enum class CommandLineOption(val option: Option, val required: Boolean = false) { HEALTH_CHECK_API_PORT( - Option.builder("H") - .longOpt("health-check-api-port") - .hasArg() - .desc("Health check rest api listen port") - .build() - ), - LISTEN_PORT( - Option.builder("p") - .longOpt("listen-port") - .hasArg() - .desc("Listen port") - .build(), - required = true + Option.builder("H") + .longOpt("health-check-api-port") + .hasArg() + .desc("Health check rest api listen port") + .build() ), - CONFIGURATION_FIRST_REQUEST_DELAY( - Option.builder("d") - .longOpt("first-request-delay") - .hasArg() - .desc("Delay of first request for configuration in seconds") - .build() + CONFIGURATION_FILE( + Option.builder("c") + .longOpt("configuration-file") + .hasArg() + .desc("Json file containing HV-VES configuration") + .build(), + required = true ), - CONFIGURATION_REQUEST_INTERVAL( - Option.builder("I") - .longOpt("request-interval") - .hasArg() - .desc("Interval of configuration requests in seconds") - .build() + LISTEN_PORT( + Option.builder("p") + .longOpt("listen-port") + .hasArg() + .desc("Listen port") + .build(), + required = true ), VES_HV_PORT( - Option.builder("v") - .longOpt("ves-port") - .hasArg() - .desc("VesHvCollector port") - .build(), - required = true + Option.builder("v") + .longOpt("ves-port") + .hasArg() + .desc("VesHvCollector port") + .build(), + required = true ), VES_HV_HOST( - Option.builder("h") - .longOpt("ves-host") - .hasArg() - .desc("VesHvCollector host") - .build(), - required = true + Option.builder("h") + .longOpt("ves-host") + .hasArg() + .desc("VesHvCollector host") + .build(), + required = true ), KAFKA_SERVERS( - Option.builder("s") - .longOpt("kafka-bootstrap-servers") - .hasArg() - .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format") - .build(), - required = true + Option.builder("s") + .longOpt("kafka-bootstrap-servers") + .hasArg() + .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format") + .build(), + required = true ), KAFKA_TOPICS( - Option.builder("f") - .longOpt("kafka-topics") - .hasArg() - .desc("Comma-separated Kafka topics") - .build(), - required = true + Option.builder("f") + .longOpt("kafka-topics") + .hasArg() + .desc("Comma-separated Kafka topics") + .build(), + required = true ), SSL_DISABLE( - Option.builder("l") - .longOpt("ssl-disable") - .desc("Disable SSL encryption") - .build() + Option.builder("l") + .longOpt("ssl-disable") + .desc("Disable SSL encryption") + .build() ), KEY_STORE_FILE( - Option.builder("k") - .longOpt("key-store") - .hasArg() - .desc("Key store in PKCS12 format") - .build() + Option.builder("k") + .longOpt("key-store") + .hasArg() + .desc("Key store in PKCS12 format") + .build() ), KEY_STORE_PASSWORD( Option.builder("kp") @@ -105,54 +99,31 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false) .build() ), TRUST_STORE_FILE( - Option.builder("t") - .longOpt("trust-store") - .hasArg() - .desc("File with trusted certificate bundle in PKCS12 format") - .build() + Option.builder("t") + .longOpt("trust-store") + .hasArg() + .desc("File with trusted certificate bundle in PKCS12 format") + .build() ), TRUST_STORE_PASSWORD( - Option.builder("tp") - .longOpt("trust-store-password") - .hasArg() - .desc("Trust store password") - .build() - ), - IDLE_TIMEOUT_SEC( - Option.builder("i") - .longOpt("idle-timeout-sec") - .hasArg() - .desc( - """Idle timeout for remote hosts. After given time without any data exchange the - |connection might be closed.""".trimMargin() - ) - .build() + Option.builder("tp") + .longOpt("trust-store-password") + .hasArg() + .desc("Trust store password") + .build() ), MAXIMUM_PAYLOAD_SIZE_BYTES( - Option.builder("m") - .longOpt("max-payload-size") - .hasArg() - .desc("Maximum supported payload size in bytes") - .build() - ), - LOG_LEVEL( - Option.builder("ll") - .longOpt("log-level") - .hasArg() - .desc("Log level") - .build() - ), - DUMMY_MODE( - Option.builder("u") - .longOpt("dummy") - .desc("If present will start in dummy mode (dummy external services)") - .build() + Option.builder("m") + .longOpt("max-payload-size") + .hasArg() + .desc("Maximum supported payload size in bytes") + .build() ); fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String = - option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> - "${prefix}_${mainPart}" - } + option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> + "${prefix}_${mainPart}" + } companion object { private const val DEFAULT_ENV_PREFIX = "VESHV" diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt index c0fbcde6..48cac69a 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt @@ -32,13 +32,13 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain * @since June 2018 */ +val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried() + fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO { err.printMessage() err.printHelp(programName) }.flatMap { ExitFailure(2).io() } -val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried() - fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = longValue(cmdLineOpt).getOrElse { default } diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt index 736710ff..6614e77f 100644 --- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt +++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt @@ -25,8 +25,7 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -49,13 +48,13 @@ class CommandLineOptionTest : Spek({ } given("sample option without prefix") { - val opt = DUMMY_MODE + val opt = SSL_DISABLE on("calling environmentVariableName") { val result = opt.environmentVariableName() it("should return prefixed upper snake cased long option name") { - assertThat(result).isEqualTo("VESHV_DUMMY") + assertThat(result).isEqualTo("VESHV_SSL_DISABLE") } } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index 874cc5b0..5b547a28 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -19,13 +19,25 @@ */ package org.onap.dcae.collectors.veshv.config.api -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException +import org.onap.dcae.collectors.veshv.config.impl.ArgHvVesConfiguration +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator +import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader +import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import reactor.core.publisher.Flux class ConfigurationModule { - fun createConfigurationFromCommandLine(args: Array<String>) = - ArgVesHvConfiguration().parse(args) - fun hvVesConfigurationUpdates(): Flux<ServerConfiguration> = Flux.never<ServerConfiguration>() + private val cmd = ArgHvVesConfiguration() + private val configReader = FileConfigurationReader() + private val configValidator = ConfigurationValidator() + + fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> = + Flux.just(cmd.parse(args)) + .throwOnLeft { MissingArgumentException(it.message, it.cause) } + .map { configReader.loadConfig(it.reader()) } + .map { configValidator.validate(it) } + .throwOnLeft { ValidationException(it.message) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt index 8fbc649b..566f2c08 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt @@ -21,22 +21,34 @@ package org.onap.dcae.collectors.veshv.config.api.model import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.net.InetSocketAddress import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ +data class HvVesConfiguration( + val server: ServerConfiguration, + val cbs: CbsConfiguration, + val security: SecurityConfiguration, + val collector: CollectorConfiguration, + val logLevel: LogLevel +) + data class ServerConfiguration( - val serverListenAddress: InetSocketAddress, - val kafkaConfiguration: KafkaConfiguration, - val configurationProviderParams: ConfigurationProviderParams, - val securityConfiguration: SecurityConfiguration, + val listenPort: Int, val idleTimeout: Duration, - val healthCheckApiListenAddress: InetSocketAddress, - val maximumPayloadSizeBytes: Int, - val logLevel: LogLevel, - val dummyMode: Boolean = false + val maxPayloadSizeBytes: Int ) +data class CbsConfiguration( + val firstRequestDelay: Duration, + val requestInterval: Duration +) + +data class CollectorConfiguration( + val maxRequestSizeBytes: Int, + val kafkaServers: String, + val routing: Routing, + val dummyMode: Boolean = false +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt index 7999451e..2fc29829 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,6 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class CollectorConfiguration(val routing: Routing) +class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) + +class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt index 847f35ad..aab8ecad 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ data class Routing(val routes: List<Route>) { Option.fromNullable(routes.find { it.applies(commonHeader) }) } -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = {0}) { +data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) { fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain @@ -40,24 +40,17 @@ data class Route(val domain: String, val targetTopic: String, val partitioning: /* -Configuration DSL - */ +HvVesConfiguration DSL +*/ -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { - val conf = RoutingBuilder() - conf.init() - return conf -} +fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init) class RoutingBuilder { private val routes: MutableList<RouteBuilder> = mutableListOf() - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { - val rule = RouteBuilder() - rule.init() - routes.add(rule) - return rule - } + fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder() + .apply(init) + .also { routes.add(it) } fun build() = Routing(routes.map { it.build() }.toList()) } @@ -68,18 +61,17 @@ class RouteBuilder { private lateinit var targetTopic: String private lateinit var partitioning: (CommonEventHeader) -> Int - fun fromDomain(domain: String) : RouteBuilder = apply { + fun fromDomain(domain: String): RouteBuilder = apply { this.domain = domain } - fun toTopic(targetTopic: String) : RouteBuilder = apply { + fun toTopic(targetTopic: String): RouteBuilder = apply { this.targetTopic = targetTopic } - fun withFixedPartitioning(num: Int = 0) : RouteBuilder = apply { + fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply { partitioning = { num } } fun build() = Route(domain, targetTopic, partitioning) - } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt new file mode 100644 index 00000000..9587d5b0 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Option +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE +import org.onap.dcae.collectors.veshv.commandline.stringValue +import java.io.File + +internal class ArgHvVesConfiguration : ArgBasedConfiguration<File>(DefaultParser()) { + override val cmdLineOptionsList = listOf(CONFIGURATION_FILE) + + override fun getConfiguration(cmdLine: CommandLine): Option<File> = + cmdLine.stringValue(CONFIGURATION_FILE).map(::File) + +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt deleted file mode 100644 index 08346d30..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt +++ /dev/null @@ -1,151 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import arrow.core.Option -import arrow.core.fix -import arrow.core.getOrElse -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding -import org.apache.commons.cli.CommandLine -import org.apache.commons.cli.DefaultParser -import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_FILE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LOG_LEVEL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.SSL_DISABLE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_FILE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.commandline.hasOption -import org.onap.dcae.collectors.veshv.commandline.intValue -import org.onap.dcae.collectors.veshv.commandline.longValue -import org.onap.dcae.collectors.veshv.commandline.stringValue -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.net.InetSocketAddress -import java.time.Duration - - -internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { - override val cmdLineOptionsList = listOf( - KAFKA_SERVERS, - HEALTH_CHECK_API_PORT, - LISTEN_PORT, - CONFIGURATION_FIRST_REQUEST_DELAY, - CONFIGURATION_REQUEST_INTERVAL, - SSL_DISABLE, - KEY_STORE_FILE, - KEY_STORE_PASSWORD, - TRUST_STORE_FILE, - TRUST_STORE_PASSWORD, - IDLE_TIMEOUT_SEC, - MAXIMUM_PAYLOAD_SIZE_BYTES, - DUMMY_MODE, - LOG_LEVEL - ) - - override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> = - Option.monad().binding { - val healthCheckApiPort = cmdLine.intValue( - HEALTH_CHECK_API_PORT, - DefaultValues.HEALTH_CHECK_API_PORT - ) - val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() - val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) - val maxPayloadSizeBytes = cmdLine.intValue( - MAXIMUM_PAYLOAD_SIZE_BYTES, - DefaultValues.MAX_PAYLOAD_SIZE_BYTES - ) - val dummyMode = cmdLine.hasOption(DUMMY_MODE) - val security = createSecurityConfiguration(cmdLine) - .doOnFailure { ex -> - logger.withError { log("Could not read security keys", ex) } - } - .toOption() - .bind() - val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL) - val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() - ServerConfiguration( - serverListenAddress = InetSocketAddress(listenPort), - kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes), - healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), - configurationProviderParams = configurationProviderParams, - securityConfiguration = security, - idleTimeout = Duration.ofSeconds(idleTimeoutSec), - maximumPayloadSizeBytes = maxPayloadSizeBytes, - dummyMode = dummyMode, - logLevel = determineLogLevel(logLevel) - ) - }.fix() - - private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> = - Option.monad().binding { - val firstRequestDelay = cmdLine.longValue( - CONFIGURATION_FIRST_REQUEST_DELAY, - DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY - ) - val requestInterval = cmdLine.longValue( - CONFIGURATION_REQUEST_INTERVAL, - DefaultValues.CONFIGURATION_REQUEST_INTERVAL - ) - ConfigurationProviderParams( - Duration.ofSeconds(firstRequestDelay), - Duration.ofSeconds(requestInterval) - ) - }.fix() - - private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel) - .getOrElse { - logger.warn { - "Failed to parse $logLevel as $LOG_LEVEL command line. " + - "Using default log level (${DefaultValues.LOG_LEVEL})" - } - LogLevel.valueOf(DefaultValues.LOG_LEVEL) - } - - - internal object DefaultValues { - const val HEALTH_CHECK_API_PORT = 6060 - const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L - const val CONFIGURATION_REQUEST_INTERVAL = 5L - const val IDLE_TIMEOUT_SEC = 60L - const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES - val LOG_LEVEL = LogLevel.INFO.name - } - - companion object { - private val logger = Logger(ArgVesHvConfiguration::class) - } -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt new file mode 100644 index 00000000..6c74c33f --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Either +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding +import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress +import java.time.Duration + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since March 2019 + */ +internal class ConfigurationValidator { + + fun validate(partialConfig: PartialConfiguration) + : Either<ValidationError, HvVesConfiguration> = binding { + val logLevel = determineLogLevel(partialConfig.logLevel) + + val serverConfiguration = partialConfig.server.bind() + .let { createServerConfiguration(it).bind() } + + val cbsConfiguration = partialConfig.cbs.bind() + .let { createCbsConfiguration(it).bind() } + + val securityConfiguration = partialConfig.security.bind() + .let { createSecurityConfiguration(it).bind() } + + val collectorConfiguration = partialConfig.collector.bind() + .let { createCollectorConfig(it).bind() } + + HvVesConfiguration( + serverConfiguration, + cbsConfiguration, + securityConfiguration, + collectorConfiguration, + logLevel + ) + }.toEither { ValidationError("Some required configuration options are missing") } + + private fun determineLogLevel(logLevel: Option<LogLevel>) = + logLevel.getOrElse { + logger.warn { + "Missing or invalid \"logLevel\" field. " + + "Using default log level ($DEFAULT_LOG_LEVEL)" + } + DEFAULT_LOG_LEVEL + } + + private fun createServerConfiguration(partial: PartialServerConfig) = + partial.mapBinding { + ServerConfiguration( + it.listenPort.bind(), + Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()), + it.maxPayloadSizeBytes.bind() + ) + } + + private fun createCbsConfiguration(partial: PartialCbsConfig) = + partial.mapBinding { + CbsConfiguration( + Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()), + Duration.ofSeconds(it.requestIntervalSec.bind().toLong()) + ) + } + + private fun createSecurityConfiguration(partial: PartialSecurityConfig) = + partial.keys.map { SecurityConfiguration(Some(it)) } + + private fun createCollectorConfig(partial: PartialCollectorConfig) = + partial.mapBinding { + CollectorConfiguration( + it.maxRequestSizeBytes.bind(), + toKafkaServersString(it.kafkaServers.bind()), + it.routing.bind(), + it.dummyMode.bind() + ) + } + + private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String = + kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" } + + companion object { + val DEFAULT_LOG_LEVEL = LogLevel.INFO + private val logger = Logger(ConfigurationValidator::class) + } +} + +data class ValidationError(val message: String, val cause: Option<Throwable> = None) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt index 8d9cca73..3e6df3e0 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.config.impl +import arrow.core.None import arrow.core.Option import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel @@ -30,31 +31,29 @@ import java.net.InetSocketAddress * @since February 2019 */ internal data class PartialConfiguration( - val server : Option<PartialServerConfig>, - val cbs : Option<PartialCbsConfig>, - val security : Option<PartialSecurityConfig>, - val kafka : Option<PartialKafkaConfig>, - val logLevel : Option<LogLevel> + val server: Option<PartialServerConfig> = None, + val cbs: Option<PartialCbsConfig> = None, + val security: Option<PartialSecurityConfig> = None, + val collector: Option<PartialCollectorConfig> = None, + val logLevel: Option<LogLevel> = None ) internal data class PartialServerConfig( - val healthCheckApiPort : Option<Int>, - val listenPort : Option<Int>, - val idleTimeoutSec : Option<Int>, - val maximumPayloadSizeBytes : Option<Int>, - val dummyMode : Option<Boolean> + val listenPort: Option<Int> = None, + val idleTimeoutSec: Option<Int> = None, + val maxPayloadSizeBytes: Option<Int> = None ) internal data class PartialCbsConfig( - val firstRequestDelaySec : Option<Int>, - val requestIntervalSec : Option<Int> + val firstRequestDelaySec: Option<Int> = None, + val requestIntervalSec: Option<Int> = None ) -internal data class PartialSecurityConfig( - val sslDisable : Option<Boolean>, - val keys : Option<SecurityKeys>) +internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None) -internal data class PartialKafkaConfig( - val kafkaServers : Option<Array<InetSocketAddress>>, - val routing : Option<Routing> +internal data class PartialCollectorConfig( + val dummyMode: Option<Boolean> = None, + val maxRequestSizeBytes: Option<Int> = None, + val kafkaServers: Option<List<InetSocketAddress>> = None, + val routing: Option<Routing> = None ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt deleted file mode 100644 index 0f8d8d0e..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt +++ /dev/null @@ -1,192 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.time.Duration -import kotlin.test.assertNotNull - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object ArgVesHvConfigurationTest : Spek({ - lateinit var cut: ArgVesHvConfiguration - val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" - val healthCheckApiPort = "6070" - val firstRequestDelay = "10" - val requestInterval = "5" - val listenPort = "6969" - val keyStorePassword = "kspass" - val trustStorePassword = "tspass" - val logLevel = LogLevel.DEBUG.name - - beforeEachTest { - cut = ArgVesHvConfiguration() - } - - describe("parsing arguments") { - given("all parameters are present in the long form") { - lateinit var result: ServerConfiguration - - beforeEachTest { - result = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword, - "--log-level", logLevel - ) - } - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper listen port") { - assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) - } - - - it("should set default listen address") { - assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0") - } - - it("should set proper health check api port") { - assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt()) - } - - it("should set default health check api address") { - assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") - } - - it("should set proper first request delay") { - assertThat(result.configurationProviderParams.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) - } - - it("should set proper request interval") { - assertThat(result.configurationProviderParams.requestInterval) - .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) - } - - it("should set proper security configuration") { - assertThat(result.securityConfiguration.keys.isEmpty()).isFalse() - - val keys = result.securityConfiguration.keys.orNull() as SecurityKeys - assertNotNull(keys.keyStore()) - assertNotNull(keys.trustStore()) - keys.keyStorePassword().useChecked { - assertThat(it).isEqualTo(keyStorePassword.toCharArray()) - - } - keys.trustStorePassword().useChecked { - assertThat(it).isEqualTo(trustStorePassword.toCharArray()) - } - } - - it("should set proper log level") { - assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG) - } - } - - describe("required parameter is absent") { - on("missing listen port") { - it("should throw exception") { - assertThat( - cut.parseExpectingFailure( - "--ssl-disable", - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval - ) - ).isInstanceOf(WrongArgumentError::class.java) - } - } - on("missing configuration url") { - it("should throw exception") { - assertThat( - cut.parseExpectingFailure( - "--listen-port", listenPort, - "--ssl-disable", - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval - ) - ).isInstanceOf(WrongArgumentError::class.java) - } - } - } - - describe("correct log level not provided") { - on("missing log level") { - it("should set default INFO value") { - val config = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword - ) - - assertThat(config.logLevel).isEqualTo(LogLevel.INFO) - } - } - - on("incorrect log level") { - it("should set default INFO value") { - val config = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword, - "--log-level", "1" - ) - - assertThat(config.logLevel).isEqualTo(LogLevel.INFO) - } - } - } - } -}) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt new file mode 100644 index 00000000..dbe757c4 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError +import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess +import java.io.File + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object ArgVesHvConfigurationTest : Spek({ + lateinit var cut: ArgHvVesConfiguration + val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json") + + beforeEachTest { + cut = ArgHvVesConfiguration() + } + + describe("parsing arguments") { + given("all parameters are present in the long form") { + lateinit var result: File + + beforeEachTest { + result = cut.parseExpectingSuccess( + "--configuration-file", configFilePath + ) + } + + it("should read proper configuration file") { + assertThat(result.exists()).isTrue() + } + } + + describe("required parameter is absent") { + on("missing configuration file path") { + it("should throw exception") { + assertThat( + cut.parseExpectingFailure( + "--non-existing-option", "" + ) + ).isInstanceOf(WrongArgumentError::class.java) + } + } + } + } +})
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt new file mode 100644 index 00000000..12396d23 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -0,0 +1,138 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.time.Duration + +internal object ConfigurationValidatorTest : Spek({ + describe("ConfigurationValidator") { + val cut = ConfigurationValidator() + + describe("validating partial configuration with missing fields") { + val config = PartialConfiguration( + Some(PartialServerConfig(listenPort = Some(1))) + ) + + it("should return ValidationError") { + val result = cut.validate(config) + assertThat(result.isLeft()).isTrue() + } + } + + describe("validating configuration with empty log level") { + val config = PartialConfiguration( + Some(PartialServerConfig( + Some(1), + Some(2), + Some(3) + )), + Some(PartialCbsConfig( + Some(5), + Some(3) + )), + Some(PartialSecurityConfig( + Some(mock()) + )), + Some(PartialCollectorConfig( + Some(true), + Some(4), + Some(emptyList()), + Some(routing { }.build()) + )), + None + ) + + it("should use default log level") { + val result = cut.validate(config) + result.fold( + { + fail("Configuration should have been created successfully") + }, + { + assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) + } + ) + } + } + + describe("validating complete configuration") { + val idleTimeoutSec = 10 + val firstReqDelaySec = 10 + val securityKeys = Some(mock<SecurityKeys>()) + val routing = routing { }.build() + + val config = PartialConfiguration( + Some(PartialServerConfig( + Some(1), + Some(idleTimeoutSec), + Some(2) + )), + Some(PartialCbsConfig( + Some(firstReqDelaySec), + Some(3) + )), + Some(PartialSecurityConfig( + securityKeys + )), + Some(PartialCollectorConfig( + Some(true), + Some(4), + Some(emptyList()), + Some(routing) + )), + Some(LogLevel.INFO) + ) + + it("should create valid configuration") { + val result = cut.validate(config) + result.fold( + { + fail("Configuration should have been created successfully") + }, + { + assertThat(it.server.idleTimeout) + .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong())) + + assertThat(it.security.keys) + .isEqualTo(securityKeys) + + assertThat(it.cbs.firstRequestDelay) + .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong())) + + assertThat(it.collector.routing) + .isEqualTo(routing) + } + ) + } + } + } +}) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt index 55da7d02..ab99f13c 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt @@ -17,21 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.config.api +package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Some -import org.jetbrains.spek.api.Spek import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader -import org.onap.dcae.collectors.veshv.config.impl.PartialCbsConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialKafkaConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialSecurityConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialServerConfig +import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.io.InputStreamReader import java.io.StringReader import java.net.InetSocketAddress @@ -41,12 +36,13 @@ import java.net.InetSocketAddress */ internal object FileConfigurationReaderTest : Spek({ describe("A configuration loader utility") { + val cut = FileConfigurationReader() describe("partial configuration loading") { it("parses enumerations") { val input = """{"logLevel":"ERROR"}""" - val config = FileConfigurationReader().loadConfig(StringReader(input)) + val config = cut.loadConfig(StringReader(input)) assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) } @@ -58,14 +54,13 @@ internal object FileConfigurationReaderTest : Spek({ } } """.trimIndent() - val config = FileConfigurationReader().loadConfig(StringReader(input)) + val config = cut.loadConfig(StringReader(input)) assertThat(config.server.nonEmpty()).isTrue() - assertThat(config.server.orNull()?.healthCheckApiPort).isEqualTo(Some(12002)) assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003)) } it("parses ip address") { - val input = """{ "kafka" : { + val input = """{ "collector" : { "kafkaServers": [ "192.168.255.1:5005", "192.168.255.26:5006" @@ -73,13 +68,13 @@ internal object FileConfigurationReaderTest : Spek({ } }""" - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.kafkaServers.nonEmpty()).isTrue() - val addresses = kafka.kafkaServers.orNull() as Array<InetSocketAddress> + val config = cut.loadConfig(StringReader(input)) + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + assertThat(collector.kafkaServers.nonEmpty()).isTrue() + val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress> assertThat(addresses) - .isEqualTo(arrayOf( + .isEqualTo(listOf( InetSocketAddress("192.168.255.1", 5005), InetSocketAddress("192.168.255.26", 5006) )) @@ -87,7 +82,7 @@ internal object FileConfigurationReaderTest : Spek({ it("parses routing array with RoutingAdapter") { val input = """{ - "kafka" : { + "collector" : { "routing" : [ { "fromDomain": "perf3gpp", @@ -96,30 +91,38 @@ internal object FileConfigurationReaderTest : Spek({ ] } }""".trimIndent() - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.routing.nonEmpty()).isTrue() - val routing = kafka.routing.orNull() as Routing + val config = cut.loadConfig(StringReader(input)) + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + assertThat(collector.routing.nonEmpty()).isTrue() + val routing = collector.routing.orNull() as Routing routing.run { assertThat(routes.size).isEqualTo(1) assertThat(routes[0].domain).isEqualTo("perf3gpp") assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP") } } + + it("parses invalid log level string to empty option") { + val input = """{ + "logLevel": something + }""".trimMargin() + val config = cut.loadConfig(input.reader()) + + assertThat(config.logLevel.isEmpty()) + } } describe("complete file loading") { it("loads actual file") { - val config = FileConfigurationReader().loadConfig( - InputStreamReader( - FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json"))) + val config = cut.loadConfig( + javaClass.resourceAsStream("/sampleConfig.json")) + assertThat(config).isNotNull assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) assertThat(config.security.nonEmpty()).isTrue() val security = config.security.orNull() as PartialSecurityConfig - assertThat(security.sslDisable.orNull()).isFalse() assertThat(security.keys.nonEmpty()).isTrue() assertThat(config.cbs.nonEmpty()).isTrue() @@ -127,21 +130,24 @@ internal object FileConfigurationReaderTest : Spek({ assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7)) assertThat(cbs.requestIntervalSec).isEqualTo(Some(900)) - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.kafkaServers.nonEmpty()).isTrue() - assertThat(kafka.routing.nonEmpty()).isTrue() + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + collector.run { + assertThat(dummyMode).isEqualTo(Some(false)) + assertThat(maxRequestSizeBytes).isEqualTo(Some(512000)) + assertThat(kafkaServers.nonEmpty()).isTrue() + assertThat(routing.nonEmpty()).isTrue() + } assertThat(config.server.nonEmpty()).isTrue() val server = config.server.orNull() as PartialServerConfig server.run { - assertThat(dummyMode).isEqualTo(Some(false)) - assertThat(healthCheckApiPort).isEqualTo(Some(5000)) assertThat(idleTimeoutSec).isEqualTo(Some(1200)) assertThat(listenPort).isEqualTo(Some(6000)) - assertThat(maximumPayloadSizeBytes).isEqualTo(Some(512000)) + assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000)) } } } } -})
\ No newline at end of file +}) + diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json index b64df05a..b49085e8 100644 --- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json +++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json @@ -1,16 +1,16 @@ { - "server" : { - "healthCheckApiPort" : 5000, - "listenPort" : 6000, - "idleTimeoutSec" : 1200, - "maximumPayloadSizeBytes" : 512000, - "dummyMode" : false + "logLevel": "ERROR", + "server": { + "healthCheckApiPort": 5000, + "listenPort": 6000, + "idleTimeoutSec": 1200, + "maxPayloadSizeBytes": 512000 }, - "cbs" : { + "cbs": { "firstRequestDelaySec": 7, "requestIntervalSec": 900 }, - "security" : { + "security": { "sslDisable": false, "keys": { "keyStoreFile": "test.ks.pkcs12", @@ -19,7 +19,9 @@ "trustStorePassword": "changeMeToo" } }, - "kafka" : { + "collector": { + "dummyMode": false, + "maxRequestSizeBytes": 512000, "kafkaServers": [ "192.168.255.1:5005", "192.168.255.1:5006" @@ -30,6 +32,5 @@ "toTopic": "HV_VES_PERF3GPP" } ] - }, - "logLevel" : "ERROR" + } }
\ No newline at end of file diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 84310802..782d2324 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux<CollectorConfiguration> + operator fun invoke(): Flux<Routing> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3ea14385..c08df748 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.factory import arrow.core.Option -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, + private val maxPayloadSizeBytes: Int, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config: AtomicReference<CollectorConfiguration> = AtomicReference() + val config = AtomicReference<Routing>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), + router = Router(routing, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 58a8599a..6c4e4671 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory /** @@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + + private val sslFactory = SslContextFactory() + + fun createNettyTcpServer(serverConfig: ServerConfiguration, + securityConfig: SecurityConfiguration, collectorProvider: CollectorProvider, - metrics: Metrics): Server = - NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics) + metrics: Metrics + ): Server = NettyTcpServer( + serverConfig, + sslFactory.createServerContext(securityConfig), + collectorProvider, + metrics + ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index a853839a..c362020e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(dummyMode: Boolean, - kafkaConfig: KafkaConfiguration): SinkProvider = - if (dummyMode) + fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = + if (config.dummyMode) LoggingSinkProvider() else - KafkaSinkProvider(kafkaConfig) + KafkaSinkProvider(config) - fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - configurationProviderParams) + config) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 51b6d4f0..754a2efc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog @@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie retrySpec: Retry<Any> ) : ConfigurationProvider { - constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this( + constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this( cbsClientMono, params.firstRequestDelay, params.requestInterval, @@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<CollectorConfiguration> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient .updates(RequestDiagnosticContext.create(), firstRequestDelay, requestInterval) @@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .retryWhen(retry) - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + private fun createCollectorConfiguration(configuration: JsonObject): Routing = try { val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject - defineRoute { - fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() } catch (e: NullPointerException) { throw ParsingException("Failed to parse configuration", e) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index f52890b0..96e45a02 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender @@ -46,7 +46,7 @@ import java.lang.Integer.max internal class KafkaSinkProvider internal constructor( private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { - constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) @@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor( private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - private fun constructKafkaSender(config: KafkaConfiguration) = + + private fun constructKafkaSender(config: CollectorConfiguration) = KafkaSender.create(constructSenderOptions(config)) - private fun constructSenderOptions(config: KafkaConfiguration) = + private fun constructSenderOptions(config: CollectorConfiguration) = SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config)) + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) + .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) + .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) @@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor( .producerProperty(ACKS_CONFIG, "1") .stopOnError(false) - private fun maxRequestSize(config: KafkaConfiguration) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt() + private fun maxRequestSize(maxRequestSizeBytes: Int) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - private fun bufferMemory(config: KafkaConfiguration) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes) + private fun bufferMemory(maxRequestSizeBytes: Int) = + max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 123956ad..fab96560 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.core.Option import arrow.core.getOrElse import arrow.effects.IO +import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics @@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -41,6 +42,7 @@ import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer import java.net.InetAddress +import java.net.InetSocketAddress import java.time.Duration @@ -48,14 +50,14 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: SslContextFactory, +internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, + private val sslContext: Option<SslContext>, private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { override fun start(): IO<ServerHandle> = IO { TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } .configureSsl() .handle(this::handleConnection) .doOnUnbound { @@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun TcpServer.configureSsl() = - sslContextFactory - .createServerContext(serverConfig.securityConfiguration) - .map { sslContext -> + sslContext + .map { serverContext -> logger.info { "Collector configured with SSL enabled" } - this.secure { b -> b.sslContext(sslContext) } + this.secure { it.sslContext(serverContext) } }.getOrElse { logger.info { "Collector configured with SSL disabled" } this @@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> withConnectionFrom(nettyInbound) { connection -> connection - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { collector.handleConnection(nettyInbound.createDataStream()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index 21aaa129..f830f2c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val route1 = it.routing.routes[0] + val route1 = it.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") .isEqualTo(route1.domain) @@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({ .describedAs("target topic 1") .isEqualTo(route1.targetTopic) - val route2 = it.routing.routes[1] + val route2 = it.routes[1] assertThat(HEARTBEAT.domainName) .describedAs("routed domain 2") .isEqualTo(route2.domain) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 068476ad..1e3f2e7a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass import reactor.kafka.sender.KafkaSender @@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender internal object KafkaSinkProviderTest : Spek({ describe("non functional requirements") { given("sample configuration") { - val config = KafkaConfiguration("localhost:9090", - 1024 * 1024) + val config = CollectorConfiguration( + dummyMode = false, + maxRequestSizeBytes = 1024 * 1024, + kafkaServers = "localhost:9090", + routing = routing { }.build()) + val cut = KafkaSinkProvider(config) on("sample clients") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index aaa3ee3b..bd056d4d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicConfiguration) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index dc5fe60b..ece42285 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index d97541ba..e84e9486 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink +import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()): AutoCloseable { +class Sut(sink: Sink = StoringSink()) : AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index ed46b119..17f6ce32 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.* - +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload import reactor.core.publisher.Flux import java.time.Duration @@ -149,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -210,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -317,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -346,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index c7e12bbd..1ad2b0e3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,12 +20,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT - +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) - -val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } - }.build() -) +val basicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() -val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) +val twoDomainsToOneTopicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(HEARTBEAT.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(MEASUREMENT.domainName) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } +}.build() + + +val configurationWithDifferentRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(ALTERNATE_PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() + +val emptyRouting = routing { }.build() -val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - }.build() -) class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() + private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(collectorConfiguration) + configStream.onNext(routing) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index dc9be16b..f6d1eab7 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -20,9 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config import arrow.core.Option -import arrow.core.fix -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration @@ -34,6 +31,7 @@ import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYL import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import java.net.InetSocketAddress class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { @@ -45,7 +43,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration ) override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> = - Option.monad().binding { + binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() @@ -62,5 +60,5 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration maxPayloadSizeBytes, kafkaBootstrapServers, kafkaTopics) - }.fix() + } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 39fcae21..c8a3c013 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,58 +19,58 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monad.monad -import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.neverComplete import org.onap.dcae.collectors.veshv.utils.registerShutdownHook +import reactor.core.scheduler.Schedulers +import java.util.concurrent.atomic.AtomicReference -private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv" -private val logger = Logger("$VESHV_PACKAGE.main") -private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt" -fun main(args: Array<String>) = - ConfigurationModule() - .createConfigurationFromCommandLine(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } - ) +private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv" +private val logger = Logger("$VES_HV_PACKAGE.main") -private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } +private val hvVesServer = AtomicReference<ServerHandle>() - val healthCheckServerHandle = HealthCheckServer.start(config).bind() - val hvVesHandle = VesServer.start(config).bind() +fun main(args: Array<String>) { + HealthCheckServer.start() + ConfigurationModule() + .hvVesConfigurationUpdates(args) + .publishOn(Schedulers.single(Schedulers.elastic())) + .doOnNext(::startServer) + .doOnError(::logServerStartFailed) + .neverComplete() // TODO: remove after merging configuration stream with cbs + .block() +} + +private fun startServer(config: HvVesConfiguration) { + stopRunningServer() + Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + + VesServer.start(config).let { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } +} - registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) - hvVesHandle.await().bind() - }.fix() +private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() -internal fun closeServers(vararg handles: ServerHandle, - healthState: HealthState = HealthState.INSTANCE) = { +internal fun shutdownGracefully(serverHandle: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).unsafeRunSync() + serverHandle.close().unsafeRunSync() logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } + +private fun logServerStartFailed(ex: Throwable) = + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index 15472b5e..bc284d08 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -19,25 +19,38 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -object HealthCheckServer : ServerStarter() { - override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() +object HealthCheckServer { - private fun createHealthCheckServer(config: ServerConfiguration) = + private const val DEFAULT_HEALTHCHECK_PORT = 6060 + private val logger = Logger(HealthCheckServer::class) + + fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) = + createHealthCheckServer(port) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createHealthCheckServer(listenPort: Int) = HealthCheckApiServer( HealthState.INSTANCE, MicrometerMetrics.INSTANCE.metricsProvider, - config.healthCheckApiListenAddress) + InetSocketAddress(listenPort)) - override fun serverStartedMessage(handle: ServerHandle) = - "Health check server is up and listening on ${handle.host}:${handle.port}" + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "Health check server is up and listening on ${handle.host}:${handle.port}" + } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt deleted file mode 100644 index 74a66324..00000000 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main.servers - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.then -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since August 2018 - */ -abstract class ServerStarter { - fun start(config: ServerConfiguration): IO<ServerHandle> = - startServer(config) - .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } - - protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> - protected abstract fun serverStartedMessage(handle: ServerHandle): String - - companion object { - private val logger = Logger(ServerStarter::class) - } -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 0f5e45ec..d15dccef 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -19,33 +19,54 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -object VesServer : ServerStarter() { - override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() - - private fun createVesServer(config: ServerConfiguration): Server { - val collectorProvider = CollectorFactory( - AdapterFactory.configurationProvider(config.configurationProviderParams), - AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), - MicrometerMetrics.INSTANCE, - config.maximumPayloadSizeBytes - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE) - } - - override fun serverStartedMessage(handle: ServerHandle) = - "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +object VesServer { + + private val logger = Logger(VesServer::class) + + fun start(config: HvVesConfiguration): ServerHandle = + createVesServer(config) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createVesServer(config: HvVesConfiguration): Server = + initializeCollectorFactory(config) + .createVesHvCollectorProvider() + .let { collectorProvider -> + ServerFactory.createNettyTcpServer( + config.server, + config.security, + collectorProvider, + MicrometerMetrics.INSTANCE + ) + } + + private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = + CollectorFactory( + AdapterFactory.configurationProvider(config.cbs), + AdapterFactory.sinkCreatorFactory(config.collector), + MicrometerMetrics.INSTANCE, + config.server.maxPayloadSizeBytes + ) + + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" + } + } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index e18b0b10..d8de9f25 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -42,7 +42,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle: ServerHandle = mock() + val handle = mock<ServerHandle>() var closed = false val handleClose = IO { closed = true @@ -50,8 +50,8 @@ internal object MainTest : Spek({ whenever(handle.close()).thenReturn(handleClose) val healthState: HealthState = mock() - on("closeServers") { - closeServers(handle, healthState = healthState).invoke() + on("shutdownGracefully") { + shutdownGracefully(handle, healthState = healthState) it("should close all handles") { assertThat(closed).isTrue() diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt index fb211115..579eb84c 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt @@ -21,11 +21,9 @@ package org.onap.dcae.collectors.veshv.ssl.boundary import arrow.core.Option import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.nio.file.Path /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class SecurityConfiguration( - val keys: Option<SecurityKeys>) +data class SecurityConfiguration(val keys: Option<SecurityKeys>) diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt index 805d94d2..f72ddecb 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt @@ -30,7 +30,7 @@ import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory class SslContextFactory(private val sslFactory: SslFactory = SslFactory()) { fun createServerContext(secConfig: SecurityConfiguration): Option<SslContext> = secConfig.keys.map { sslFactory.createSecureServerContext(it) } + fun createClientContext(secConfig: SecurityConfiguration): Option<SslContext> = secConfig.keys.map { sslFactory.createSecureClientContext(it) } - } diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt index f8632350..822d84f1 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt @@ -51,7 +51,7 @@ fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> Securit private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE) -private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None) +private fun disabledSecurityConfiguration() = SecurityConfiguration(None) private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration { val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE) @@ -66,8 +66,7 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfigur .trustStorePassword(Passwords.fromString(tsPass)) .build() - return SecurityConfiguration(keys = Some(keys)) + return SecurityConfiguration(Some(keys)) } - private fun pathFromFile(file: String) = Paths.get(file) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt index 847279d9..24fedbbc 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.config.api.model +package org.onap.dcae.collectors.veshv.tests.utils -import java.time.Duration +import java.io.InputStreamReader -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since July 2018 - */ -data class ConfigurationProviderParams(val firstRequestDelay: Duration, - val requestInterval: Duration) +fun <T> Class<T>.resourceAsStream(resourcePath: String): InputStreamReader = + InputStreamReader(getResourceAsStream(resourcePath)) + +fun <T> Class<T>.absoluteResourcePath(resourcePath: String): String = + getResource(resourcePath).path diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index bedc2fcd..d5b33b91 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -20,10 +20,20 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.ForOption import arrow.core.Option import arrow.core.Try +import arrow.core.fix import arrow.core.identity +import arrow.effects.ForIO +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monad.monad +import arrow.instances.option.monad.monad import arrow.syntax.collections.firstOption +import arrow.typeclasses.MonadContinuation +import arrow.typeclasses.binding +import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference /** @@ -31,12 +41,24 @@ import java.util.concurrent.atomic.AtomicReference * @since July 2018 */ +object OptionUtils { + fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A) + : Option<A> = Option.monad().binding(c).fix() +} + +object IOUtils { + fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A) + : IO<A> = IO.monad().binding(c).fix() +} + fun <A> Either<A, A>.flatten() = fold(::identity, ::identity) fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity) +fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) } + fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get()) fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> = @@ -57,3 +79,13 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply { action(exception) } } + +fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B) + : Option<B> = let { OptionUtils.binding { c(it) } } + + + + + + + diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt index f9dff203..aaa598d2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.config.api.model +package org.onap.dcae.collectors.veshv.utils -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since December 2018 - */ -data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int) +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then()
\ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt index 87aea41e..cc940907 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt @@ -19,10 +19,19 @@ */ package org.onap.dcae.collectors.veshv.utils +import java.util.concurrent.atomic.AtomicReference + /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since January 2019 */ -fun registerShutdownHook(job: () -> Unit) = - Runtime.getRuntime() - .addShutdownHook(Thread({ job() }, "GracefulShutdownThread")) + +private val currentShutdownHook = AtomicReference<Thread>() + +fun registerShutdownHook(job: () -> Unit) { + val runtime = Runtime.getRuntime() + val newShutdownHook = Thread({ job() }, "GracefulShutdownThread") + currentShutdownHook.get()?.run(runtime::removeShutdownHook) + currentShutdownHook.set(newShutdownHook) + runtime.addShutdownHook(newShutdownHook) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 08c05e05..28cc0556 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -20,9 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import arrow.core.Option -import arrow.core.fix -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration @@ -40,6 +37,7 @@ import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress @@ -62,7 +60,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon TRUST_STORE_PASSWORD) override fun getConfiguration(cmdLine: CommandLine): Option<SimulatorConfiguration> = - Option.monad().binding { + binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() val vesPort = cmdLine.intValue(VES_HV_PORT).bind() @@ -86,7 +84,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon InetSocketAddress(vesHost, vesPort), maxPayloadSizeBytes, security) - }.fix() + } internal object DefaultValues { const val HEALTH_CHECK_API_PORT = 6063 diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index a73b39b1..a1042f3e 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -20,9 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monad.monad -import arrow.typeclasses.binding import io.vavr.collection.HashSet import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription @@ -36,6 +33,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfigura import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -62,7 +60,7 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) ) private fun startServers(config: SimulatorConfiguration): IO<Unit> = - IO.monad().binding { + binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() @@ -79,5 +77,5 @@ private fun startServers(config: SimulatorConfiguration): IO<Unit> = HealthState.INSTANCE.changeState(HealthDescription.IDLE) xnfApiServerHandler.await().bind() - }.fix() + } |