From 30afcb56b0c6c4529fdaf68d7b061eee44d68d16 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 13 Mar 2019 18:44:31 +0100 Subject: Remove environment variables and program arguments - Move all command line program arguments to json file. - Reorganize configuration classes and the way they are passed through application - Implement HV VES configuration stream - Create concrete configuration from partial one - Modify main HV-VES server starting pipeline Change-Id: I6cf874b6904ed768e4820b8132f5f760299c929e Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1340 --- .../veshv/commandline/ArgBasedConfiguration.kt | 37 ++-- .../veshv/commandline/CommandLineOption.kt | 169 ++++++++---------- .../collectors/veshv/commandline/extensions.kt | 4 +- .../veshv/commandline/CommandLineOptionTest.kt | 7 +- .../veshv/config/api/ConfigurationModule.kt | 22 ++- .../config/api/model/CollectorConfiguration.kt | 26 --- .../veshv/config/api/model/Configuration.kt | 54 ++++++ .../api/model/ConfigurationProviderParams.kt | 29 ---- .../veshv/config/api/model/Exceptions.kt | 24 +++ .../veshv/config/api/model/KafkaConfiguration.kt | 26 --- .../collectors/veshv/config/api/model/Routing.kt | 77 +++++++++ .../veshv/config/api/model/ServerConfiguration.kt | 42 ----- .../collectors/veshv/config/api/model/routing.kt | 85 --------- .../veshv/config/impl/ArgHvVesConfiguration.kt | 36 ++++ .../veshv/config/impl/ArgVesHvConfiguration.kt | 151 ---------------- .../veshv/config/impl/ConfigurationValidator.kt | 118 +++++++++++++ .../veshv/config/impl/PartialConfiguration.kt | 35 ++-- .../veshv/config/api/ArgVesHvConfigurationTest.kt | 192 --------------------- .../config/api/FileConfigurationReaderTest.kt | 147 ---------------- .../veshv/config/impl/ArgHvVesConfigurationTest.kt | 73 ++++++++ .../config/impl/ConfigurationValidatorTest.kt | 138 +++++++++++++++ .../config/impl/FileConfigurationReaderTest.kt | 153 ++++++++++++++++ .../src/test/resources/sampleConfig.json | 23 +-- .../dcae/collectors/veshv/boundary/adapters.kt | 6 +- .../collectors/veshv/factory/CollectorFactory.kt | 15 +- .../dcae/collectors/veshv/factory/ServerFactory.kt | 16 +- .../veshv/impl/adapters/AdapterFactory.kt | 15 +- .../impl/adapters/ConfigurationProviderImpl.kt | 34 ++-- .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 25 +-- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 19 +- .../impl/adapters/ConfigurationProviderTest.kt | 4 +- .../impl/adapters/kafka/KafkaSinkProviderTest.kt | 13 +- .../veshv/tests/component/MetricsSpecification.kt | 14 +- .../tests/component/PerformanceSpecification.kt | 6 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 27 +-- .../veshv/tests/component/VesHvSpecification.kt | 29 ++-- .../collectors/veshv/tests/fakes/configuration.kt | 88 +++++----- .../impl/config/ArgDcaeAppSimConfiguration.kt | 8 +- .../org/onap/dcae/collectors/veshv/main/main.kt | 72 ++++---- .../veshv/main/servers/HealthCheckServer.kt | 27 ++- .../collectors/veshv/main/servers/ServerStarter.kt | 44 ----- .../collectors/veshv/main/servers/VesServer.kt | 57 ++++-- .../onap/dcae/collectors/veshv/main/MainTest.kt | 6 +- .../veshv/ssl/boundary/SecurityConfiguration.kt | 4 +- .../veshv/ssl/boundary/SslContextFactory.kt | 2 +- .../dcae/collectors/veshv/ssl/boundary/utils.kt | 5 +- .../dcae/collectors/veshv/tests/utils/resources.kt | 28 +++ .../onap/dcae/collectors/veshv/utils/arrow/core.kt | 32 ++++ .../onap/dcae/collectors/veshv/utils/reactive.kt | 25 +++ .../dcae/collectors/veshv/utils/shutdown_hook.kt | 15 +- .../impl/config/ArgXnfSimulatorConfiguration.kt | 8 +- .../dcae/collectors/veshv/simulators/xnf/main.kt | 8 +- 52 files changed, 1183 insertions(+), 1137 deletions(-) delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt delete mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt create mode 100644 sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt delete mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt delete mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt create mode 100644 sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt delete mode 100644 sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt create mode 100644 sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt create mode 100644 sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt (limited to 'sources') diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt index 1e45c923..93d42f96 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt @@ -33,25 +33,24 @@ import java.nio.file.Paths abstract class ArgBasedConfiguration(private val parser: CommandLineParser) { abstract val cmdLineOptionsList: List - fun parse(args: Array): Either { - val parseResult = Try { - val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption) - parser.parse(commandLineOptions, args) - } - return parseResult - .toEither() - .mapLeft { ex -> WrongArgumentError(ex, cmdLineOptionsList) } - .map(this::getConfiguration) - .flatMap { - it.toEither { - WrongArgumentError( - message = "Unexpected error when parsing command line arguments", - cmdLineOptionsList = cmdLineOptionsList) - } - } - } - protected abstract fun getConfiguration(cmdLine: CommandLine): Option - protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI()) + fun parse(args: Array): Either = + Try { parseArgumentsArray(args) } + .toEither() + .mapLeft { WrongArgumentError(it, cmdLineOptionsList) } + .map(this::getConfiguration) + .flatMap { + it.toEither { + WrongArgumentError( + message = "Unexpected error when parsing command line arguments", + cmdLineOptionsList = cmdLineOptionsList) + } + } + + private fun parseArgumentsArray(args: Array) = + cmdLineOptionsList + .map { it.option } + .fold(Options(), Options::addOption) + .let { parser.parse(it, args) } } diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt index 31849215..1c1a355b 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt @@ -24,78 +24,72 @@ import org.apache.commons.cli.Option enum class CommandLineOption(val option: Option, val required: Boolean = false) { HEALTH_CHECK_API_PORT( - Option.builder("H") - .longOpt("health-check-api-port") - .hasArg() - .desc("Health check rest api listen port") - .build() - ), - LISTEN_PORT( - Option.builder("p") - .longOpt("listen-port") - .hasArg() - .desc("Listen port") - .build(), - required = true + Option.builder("H") + .longOpt("health-check-api-port") + .hasArg() + .desc("Health check rest api listen port") + .build() ), - CONFIGURATION_FIRST_REQUEST_DELAY( - Option.builder("d") - .longOpt("first-request-delay") - .hasArg() - .desc("Delay of first request for configuration in seconds") - .build() + CONFIGURATION_FILE( + Option.builder("c") + .longOpt("configuration-file") + .hasArg() + .desc("Json file containing HV-VES configuration") + .build(), + required = true ), - CONFIGURATION_REQUEST_INTERVAL( - Option.builder("I") - .longOpt("request-interval") - .hasArg() - .desc("Interval of configuration requests in seconds") - .build() + LISTEN_PORT( + Option.builder("p") + .longOpt("listen-port") + .hasArg() + .desc("Listen port") + .build(), + required = true ), VES_HV_PORT( - Option.builder("v") - .longOpt("ves-port") - .hasArg() - .desc("VesHvCollector port") - .build(), - required = true + Option.builder("v") + .longOpt("ves-port") + .hasArg() + .desc("VesHvCollector port") + .build(), + required = true ), VES_HV_HOST( - Option.builder("h") - .longOpt("ves-host") - .hasArg() - .desc("VesHvCollector host") - .build(), - required = true + Option.builder("h") + .longOpt("ves-host") + .hasArg() + .desc("VesHvCollector host") + .build(), + required = true ), KAFKA_SERVERS( - Option.builder("s") - .longOpt("kafka-bootstrap-servers") - .hasArg() - .desc("Comma-separated Kafka bootstrap servers in : format") - .build(), - required = true + Option.builder("s") + .longOpt("kafka-bootstrap-servers") + .hasArg() + .desc("Comma-separated Kafka bootstrap servers in : format") + .build(), + required = true ), KAFKA_TOPICS( - Option.builder("f") - .longOpt("kafka-topics") - .hasArg() - .desc("Comma-separated Kafka topics") - .build(), - required = true + Option.builder("f") + .longOpt("kafka-topics") + .hasArg() + .desc("Comma-separated Kafka topics") + .build(), + required = true ), SSL_DISABLE( - Option.builder("l") - .longOpt("ssl-disable") - .desc("Disable SSL encryption") - .build() + Option.builder("l") + .longOpt("ssl-disable") + .desc("Disable SSL encryption") + .build() ), KEY_STORE_FILE( - Option.builder("k") - .longOpt("key-store") - .hasArg() - .desc("Key store in PKCS12 format") - .build() + Option.builder("k") + .longOpt("key-store") + .hasArg() + .desc("Key store in PKCS12 format") + .build() ), KEY_STORE_PASSWORD( Option.builder("kp") @@ -105,54 +99,31 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false) .build() ), TRUST_STORE_FILE( - Option.builder("t") - .longOpt("trust-store") - .hasArg() - .desc("File with trusted certificate bundle in PKCS12 format") - .build() + Option.builder("t") + .longOpt("trust-store") + .hasArg() + .desc("File with trusted certificate bundle in PKCS12 format") + .build() ), TRUST_STORE_PASSWORD( - Option.builder("tp") - .longOpt("trust-store-password") - .hasArg() - .desc("Trust store password") - .build() - ), - IDLE_TIMEOUT_SEC( - Option.builder("i") - .longOpt("idle-timeout-sec") - .hasArg() - .desc( - """Idle timeout for remote hosts. After given time without any data exchange the - |connection might be closed.""".trimMargin() - ) - .build() + Option.builder("tp") + .longOpt("trust-store-password") + .hasArg() + .desc("Trust store password") + .build() ), MAXIMUM_PAYLOAD_SIZE_BYTES( - Option.builder("m") - .longOpt("max-payload-size") - .hasArg() - .desc("Maximum supported payload size in bytes") - .build() - ), - LOG_LEVEL( - Option.builder("ll") - .longOpt("log-level") - .hasArg() - .desc("Log level") - .build() - ), - DUMMY_MODE( - Option.builder("u") - .longOpt("dummy") - .desc("If present will start in dummy mode (dummy external services)") - .build() + Option.builder("m") + .longOpt("max-payload-size") + .hasArg() + .desc("Maximum supported payload size in bytes") + .build() ); fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String = - option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> - "${prefix}_${mainPart}" - } + option.longOpt.toUpperCase().replace('-', '_').let { mainPart -> + "${prefix}_${mainPart}" + } companion object { private const val DEFAULT_ENV_PREFIX = "VESHV" diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt index c0fbcde6..48cac69a 100644 --- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt +++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt @@ -32,13 +32,13 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain * @since June 2018 */ +val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried() + fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO = IO { err.printMessage() err.printHelp(programName) }.flatMap { ExitFailure(2).io() } -val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried() - fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = longValue(cmdLineOpt).getOrElse { default } diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt index 736710ff..6614e77f 100644 --- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt +++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt @@ -25,8 +25,7 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* /** * @author Piotr Jaszczyk @@ -49,13 +48,13 @@ class CommandLineOptionTest : Spek({ } given("sample option without prefix") { - val opt = DUMMY_MODE + val opt = SSL_DISABLE on("calling environmentVariableName") { val result = opt.environmentVariableName() it("should return prefixed upper snake cased long option name") { - assertThat(result).isEqualTo("VESHV_DUMMY") + assertThat(result).isEqualTo("VESHV_SSL_DISABLE") } } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index 874cc5b0..5b547a28 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -19,13 +19,25 @@ */ package org.onap.dcae.collectors.veshv.config.api -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException +import org.onap.dcae.collectors.veshv.config.impl.ArgHvVesConfiguration +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator +import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader +import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft import reactor.core.publisher.Flux class ConfigurationModule { - fun createConfigurationFromCommandLine(args: Array) = - ArgVesHvConfiguration().parse(args) - fun hvVesConfigurationUpdates(): Flux = Flux.never() + private val cmd = ArgHvVesConfiguration() + private val configReader = FileConfigurationReader() + private val configValidator = ConfigurationValidator() + + fun hvVesConfigurationUpdates(args: Array): Flux = + Flux.just(cmd.parse(args)) + .throwOnLeft { MissingArgumentException(it.message, it.cause) } + .map { configReader.loadConfig(it.reader()) } + .map { configValidator.validate(it) } + .throwOnLeft { ValidationException(it.message) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt deleted file mode 100644 index 7999451e..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -data class CollectorConfiguration(val routing: Routing) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt new file mode 100644 index 00000000..566f2c08 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.time.Duration + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +data class HvVesConfiguration( + val server: ServerConfiguration, + val cbs: CbsConfiguration, + val security: SecurityConfiguration, + val collector: CollectorConfiguration, + val logLevel: LogLevel +) + +data class ServerConfiguration( + val listenPort: Int, + val idleTimeout: Duration, + val maxPayloadSizeBytes: Int +) + +data class CbsConfiguration( + val firstRequestDelay: Duration, + val requestInterval: Duration +) + +data class CollectorConfiguration( + val maxRequestSizeBytes: Int, + val kafkaServers: String, + val routing: Routing, + val dummyMode: Boolean = false +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt deleted file mode 100644 index 847279d9..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since July 2018 - */ -data class ConfigurationProviderParams(val firstRequestDelay: Duration, - val requestInterval: Duration) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt new file mode 100644 index 00000000..2fc29829 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt @@ -0,0 +1,24 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) + +class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt deleted file mode 100644 index f9dff203..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt new file mode 100644 index 00000000..aab8ecad --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.api.model + +import arrow.core.Option +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventOuterClass.CommonEventHeader + +data class Routing(val routes: List) { + + fun routeFor(commonHeader: CommonEventHeader): Option = + Option.fromNullable(routes.find { it.applies(commonHeader) }) +} + +data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) { + + fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain + + operator fun invoke(message: VesMessage): RoutedMessage = + RoutedMessage(targetTopic, partitioning(message.header), message) +} + + +/* +HvVesConfiguration DSL +*/ + +fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init) + +class RoutingBuilder { + private val routes: MutableList = mutableListOf() + + fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder() + .apply(init) + .also { routes.add(it) } + + fun build() = Routing(routes.map { it.build() }.toList()) +} + +class RouteBuilder { + + private lateinit var domain: String + private lateinit var targetTopic: String + private lateinit var partitioning: (CommonEventHeader) -> Int + + fun fromDomain(domain: String): RouteBuilder = apply { + this.domain = domain + } + + fun toTopic(targetTopic: String): RouteBuilder = apply { + this.targetTopic = targetTopic + } + + fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply { + partitioning = { num } + } + + fun build() = Route(domain, targetTopic, partitioning) +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt deleted file mode 100644 index 8fbc649b..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.net.InetSocketAddress -import java.time.Duration - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -data class ServerConfiguration( - val serverListenAddress: InetSocketAddress, - val kafkaConfiguration: KafkaConfiguration, - val configurationProviderParams: ConfigurationProviderParams, - val securityConfiguration: SecurityConfiguration, - val idleTimeout: Duration, - val healthCheckApiListenAddress: InetSocketAddress, - val maximumPayloadSizeBytes: Int, - val logLevel: LogLevel, - val dummyMode: Boolean = false -) - diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt deleted file mode 100644 index 847f35ad..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt +++ /dev/null @@ -1,85 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api.model - -import arrow.core.Option -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -data class Routing(val routes: List) { - - fun routeFor(commonHeader: CommonEventHeader): Option = - Option.fromNullable(routes.find { it.applies(commonHeader) }) -} - -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = {0}) { - - fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain - - operator fun invoke(message: VesMessage): RoutedMessage = - RoutedMessage(targetTopic, partitioning(message.header), message) -} - - -/* -Configuration DSL - */ - -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { - val conf = RoutingBuilder() - conf.init() - return conf -} - -class RoutingBuilder { - private val routes: MutableList = mutableListOf() - - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { - val rule = RouteBuilder() - rule.init() - routes.add(rule) - return rule - } - - fun build() = Routing(routes.map { it.build() }.toList()) -} - -class RouteBuilder { - - private lateinit var domain: String - private lateinit var targetTopic: String - private lateinit var partitioning: (CommonEventHeader) -> Int - - fun fromDomain(domain: String) : RouteBuilder = apply { - this.domain = domain - } - - fun toTopic(targetTopic: String) : RouteBuilder = apply { - this.targetTopic = targetTopic - } - - fun withFixedPartitioning(num: Int = 0) : RouteBuilder = apply { - partitioning = { num } - } - - fun build() = Route(domain, targetTopic, partitioning) - -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt new file mode 100644 index 00000000..9587d5b0 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Option +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE +import org.onap.dcae.collectors.veshv.commandline.stringValue +import java.io.File + +internal class ArgHvVesConfiguration : ArgBasedConfiguration(DefaultParser()) { + override val cmdLineOptionsList = listOf(CONFIGURATION_FILE) + + override fun getConfiguration(cmdLine: CommandLine): Option = + cmdLine.stringValue(CONFIGURATION_FILE).map(::File) + +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt deleted file mode 100644 index 08346d30..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt +++ /dev/null @@ -1,151 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import arrow.core.Option -import arrow.core.fix -import arrow.core.getOrElse -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding -import org.apache.commons.cli.CommandLine -import org.apache.commons.cli.DefaultParser -import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_FILE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LOG_LEVEL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.SSL_DISABLE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_FILE -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.commandline.hasOption -import org.onap.dcae.collectors.veshv.commandline.intValue -import org.onap.dcae.collectors.veshv.commandline.longValue -import org.onap.dcae.collectors.veshv.commandline.stringValue -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.net.InetSocketAddress -import java.time.Duration - - -internal class ArgVesHvConfiguration : ArgBasedConfiguration(DefaultParser()) { - override val cmdLineOptionsList = listOf( - KAFKA_SERVERS, - HEALTH_CHECK_API_PORT, - LISTEN_PORT, - CONFIGURATION_FIRST_REQUEST_DELAY, - CONFIGURATION_REQUEST_INTERVAL, - SSL_DISABLE, - KEY_STORE_FILE, - KEY_STORE_PASSWORD, - TRUST_STORE_FILE, - TRUST_STORE_PASSWORD, - IDLE_TIMEOUT_SEC, - MAXIMUM_PAYLOAD_SIZE_BYTES, - DUMMY_MODE, - LOG_LEVEL - ) - - override fun getConfiguration(cmdLine: CommandLine): Option = - Option.monad().binding { - val healthCheckApiPort = cmdLine.intValue( - HEALTH_CHECK_API_PORT, - DefaultValues.HEALTH_CHECK_API_PORT - ) - val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() - val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) - val maxPayloadSizeBytes = cmdLine.intValue( - MAXIMUM_PAYLOAD_SIZE_BYTES, - DefaultValues.MAX_PAYLOAD_SIZE_BYTES - ) - val dummyMode = cmdLine.hasOption(DUMMY_MODE) - val security = createSecurityConfiguration(cmdLine) - .doOnFailure { ex -> - logger.withError { log("Could not read security keys", ex) } - } - .toOption() - .bind() - val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL) - val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() - ServerConfiguration( - serverListenAddress = InetSocketAddress(listenPort), - kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes), - healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), - configurationProviderParams = configurationProviderParams, - securityConfiguration = security, - idleTimeout = Duration.ofSeconds(idleTimeoutSec), - maximumPayloadSizeBytes = maxPayloadSizeBytes, - dummyMode = dummyMode, - logLevel = determineLogLevel(logLevel) - ) - }.fix() - - private fun createConfigurationProviderParams(cmdLine: CommandLine): Option = - Option.monad().binding { - val firstRequestDelay = cmdLine.longValue( - CONFIGURATION_FIRST_REQUEST_DELAY, - DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY - ) - val requestInterval = cmdLine.longValue( - CONFIGURATION_REQUEST_INTERVAL, - DefaultValues.CONFIGURATION_REQUEST_INTERVAL - ) - ConfigurationProviderParams( - Duration.ofSeconds(firstRequestDelay), - Duration.ofSeconds(requestInterval) - ) - }.fix() - - private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel) - .getOrElse { - logger.warn { - "Failed to parse $logLevel as $LOG_LEVEL command line. " + - "Using default log level (${DefaultValues.LOG_LEVEL})" - } - LogLevel.valueOf(DefaultValues.LOG_LEVEL) - } - - - internal object DefaultValues { - const val HEALTH_CHECK_API_PORT = 6060 - const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L - const val CONFIGURATION_REQUEST_INTERVAL = 5L - const val IDLE_TIMEOUT_SEC = 60L - const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES - val LOG_LEVEL = LogLevel.INFO.name - } - - companion object { - private val logger = Logger(ArgVesHvConfiguration::class) - } -} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt new file mode 100644 index 00000000..6c74c33f --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Either +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding +import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since March 2019 + */ +internal class ConfigurationValidator { + + fun validate(partialConfig: PartialConfiguration) + : Either = binding { + val logLevel = determineLogLevel(partialConfig.logLevel) + + val serverConfiguration = partialConfig.server.bind() + .let { createServerConfiguration(it).bind() } + + val cbsConfiguration = partialConfig.cbs.bind() + .let { createCbsConfiguration(it).bind() } + + val securityConfiguration = partialConfig.security.bind() + .let { createSecurityConfiguration(it).bind() } + + val collectorConfiguration = partialConfig.collector.bind() + .let { createCollectorConfig(it).bind() } + + HvVesConfiguration( + serverConfiguration, + cbsConfiguration, + securityConfiguration, + collectorConfiguration, + logLevel + ) + }.toEither { ValidationError("Some required configuration options are missing") } + + private fun determineLogLevel(logLevel: Option) = + logLevel.getOrElse { + logger.warn { + "Missing or invalid \"logLevel\" field. " + + "Using default log level ($DEFAULT_LOG_LEVEL)" + } + DEFAULT_LOG_LEVEL + } + + private fun createServerConfiguration(partial: PartialServerConfig) = + partial.mapBinding { + ServerConfiguration( + it.listenPort.bind(), + Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()), + it.maxPayloadSizeBytes.bind() + ) + } + + private fun createCbsConfiguration(partial: PartialCbsConfig) = + partial.mapBinding { + CbsConfiguration( + Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()), + Duration.ofSeconds(it.requestIntervalSec.bind().toLong()) + ) + } + + private fun createSecurityConfiguration(partial: PartialSecurityConfig) = + partial.keys.map { SecurityConfiguration(Some(it)) } + + private fun createCollectorConfig(partial: PartialCollectorConfig) = + partial.mapBinding { + CollectorConfiguration( + it.maxRequestSizeBytes.bind(), + toKafkaServersString(it.kafkaServers.bind()), + it.routing.bind(), + it.dummyMode.bind() + ) + } + + private fun toKafkaServersString(kafkaServers: List): String = + kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" } + + companion object { + val DEFAULT_LOG_LEVEL = LogLevel.INFO + private val logger = Logger(ConfigurationValidator::class) + } +} + +data class ValidationError(val message: String, val cause: Option = None) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt index 8d9cca73..3e6df3e0 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.config.impl +import arrow.core.None import arrow.core.Option import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel @@ -30,31 +31,29 @@ import java.net.InetSocketAddress * @since February 2019 */ internal data class PartialConfiguration( - val server : Option, - val cbs : Option, - val security : Option, - val kafka : Option, - val logLevel : Option + val server: Option = None, + val cbs: Option = None, + val security: Option = None, + val collector: Option = None, + val logLevel: Option = None ) internal data class PartialServerConfig( - val healthCheckApiPort : Option, - val listenPort : Option, - val idleTimeoutSec : Option, - val maximumPayloadSizeBytes : Option, - val dummyMode : Option + val listenPort: Option = None, + val idleTimeoutSec: Option = None, + val maxPayloadSizeBytes: Option = None ) internal data class PartialCbsConfig( - val firstRequestDelaySec : Option, - val requestIntervalSec : Option + val firstRequestDelaySec: Option = None, + val requestIntervalSec: Option = None ) -internal data class PartialSecurityConfig( - val sslDisable : Option, - val keys : Option) +internal data class PartialSecurityConfig(val keys: Option = None) -internal data class PartialKafkaConfig( - val kafkaServers : Option>, - val routing : Option +internal data class PartialCollectorConfig( + val dummyMode: Option = None, + val maxRequestSizeBytes: Option = None, + val kafkaServers: Option> = None, + val routing: Option = None ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt deleted file mode 100644 index 0f8d8d0e..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt +++ /dev/null @@ -1,192 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure -import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.time.Duration -import kotlin.test.assertNotNull - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -object ArgVesHvConfigurationTest : Spek({ - lateinit var cut: ArgVesHvConfiguration - val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" - val healthCheckApiPort = "6070" - val firstRequestDelay = "10" - val requestInterval = "5" - val listenPort = "6969" - val keyStorePassword = "kspass" - val trustStorePassword = "tspass" - val logLevel = LogLevel.DEBUG.name - - beforeEachTest { - cut = ArgVesHvConfiguration() - } - - describe("parsing arguments") { - given("all parameters are present in the long form") { - lateinit var result: ServerConfiguration - - beforeEachTest { - result = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword, - "--log-level", logLevel - ) - } - - it("should set proper kafka bootstrap servers") { - assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers) - } - - it("should set proper listen port") { - assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) - } - - - it("should set default listen address") { - assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0") - } - - it("should set proper health check api port") { - assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt()) - } - - it("should set default health check api address") { - assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") - } - - it("should set proper first request delay") { - assertThat(result.configurationProviderParams.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) - } - - it("should set proper request interval") { - assertThat(result.configurationProviderParams.requestInterval) - .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) - } - - it("should set proper security configuration") { - assertThat(result.securityConfiguration.keys.isEmpty()).isFalse() - - val keys = result.securityConfiguration.keys.orNull() as SecurityKeys - assertNotNull(keys.keyStore()) - assertNotNull(keys.trustStore()) - keys.keyStorePassword().useChecked { - assertThat(it).isEqualTo(keyStorePassword.toCharArray()) - - } - keys.trustStorePassword().useChecked { - assertThat(it).isEqualTo(trustStorePassword.toCharArray()) - } - } - - it("should set proper log level") { - assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG) - } - } - - describe("required parameter is absent") { - on("missing listen port") { - it("should throw exception") { - assertThat( - cut.parseExpectingFailure( - "--ssl-disable", - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval - ) - ).isInstanceOf(WrongArgumentError::class.java) - } - } - on("missing configuration url") { - it("should throw exception") { - assertThat( - cut.parseExpectingFailure( - "--listen-port", listenPort, - "--ssl-disable", - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval - ) - ).isInstanceOf(WrongArgumentError::class.java) - } - } - } - - describe("correct log level not provided") { - on("missing log level") { - it("should set default INFO value") { - val config = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword - ) - - assertThat(config.logLevel).isEqualTo(LogLevel.INFO) - } - } - - on("incorrect log level") { - it("should set default INFO value") { - val config = cut.parseExpectingSuccess( - "--kafka-bootstrap-servers", kafkaBootstrapServers, - "--health-check-api-port", healthCheckApiPort, - "--listen-port", listenPort, - "--first-request-delay", firstRequestDelay, - "--request-interval", requestInterval, - "--key-store", "/tmp/keys.p12", - "--trust-store", "/tmp/trust.p12", - "--key-store-password", keyStorePassword, - "--trust-store-password", trustStorePassword, - "--log-level", "1" - ) - - assertThat(config.logLevel).isEqualTo(LogLevel.INFO) - } - } - } - } -}) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt deleted file mode 100644 index 55da7d02..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.api - -import arrow.core.Some -import org.jetbrains.spek.api.Spek -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader -import org.onap.dcae.collectors.veshv.config.impl.PartialCbsConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialKafkaConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialSecurityConfig -import org.onap.dcae.collectors.veshv.config.impl.PartialServerConfig -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel -import java.io.InputStreamReader -import java.io.StringReader -import java.net.InetSocketAddress - -/** - * @author Pawel Biniek - * @since February 2019 - */ -internal object FileConfigurationReaderTest : Spek({ - describe("A configuration loader utility") { - - describe("partial configuration loading") { - it("parses enumerations") { - val input = """{"logLevel":"ERROR"}""" - - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) - } - - it("parses simple structure") { - val input = """{ - "server" : { - "healthCheckApiPort" : 12002, - "listenPort" : 12003 - } - } - """.trimIndent() - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.server.nonEmpty()).isTrue() - assertThat(config.server.orNull()?.healthCheckApiPort).isEqualTo(Some(12002)) - assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003)) - } - - it("parses ip address") { - val input = """{ "kafka" : { - "kafkaServers": [ - "192.168.255.1:5005", - "192.168.255.26:5006" - ] - } - }""" - - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.kafkaServers.nonEmpty()).isTrue() - val addresses = kafka.kafkaServers.orNull() as Array - assertThat(addresses) - .isEqualTo(arrayOf( - InetSocketAddress("192.168.255.1", 5005), - InetSocketAddress("192.168.255.26", 5006) - )) - } - - it("parses routing array with RoutingAdapter") { - val input = """{ - "kafka" : { - "routing" : [ - { - "fromDomain": "perf3gpp", - "toTopic": "HV_VES_PERF3GPP" - } - ] - } - }""".trimIndent() - val config = FileConfigurationReader().loadConfig(StringReader(input)) - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.routing.nonEmpty()).isTrue() - val routing = kafka.routing.orNull() as Routing - routing.run { - assertThat(routes.size).isEqualTo(1) - assertThat(routes[0].domain).isEqualTo("perf3gpp") - assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP") - } - } - } - - describe("complete file loading") { - it("loads actual file") { - val config = FileConfigurationReader().loadConfig( - InputStreamReader( - FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json"))) - assertThat(config).isNotNull - assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) - - assertThat(config.security.nonEmpty()).isTrue() - val security = config.security.orNull() as PartialSecurityConfig - assertThat(security.sslDisable.orNull()).isFalse() - assertThat(security.keys.nonEmpty()).isTrue() - - assertThat(config.cbs.nonEmpty()).isTrue() - val cbs = config.cbs.orNull() as PartialCbsConfig - assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7)) - assertThat(cbs.requestIntervalSec).isEqualTo(Some(900)) - - assertThat(config.kafka.nonEmpty()).isTrue() - val kafka = config.kafka.orNull() as PartialKafkaConfig - assertThat(kafka.kafkaServers.nonEmpty()).isTrue() - assertThat(kafka.routing.nonEmpty()).isTrue() - - assertThat(config.server.nonEmpty()).isTrue() - val server = config.server.orNull() as PartialServerConfig - server.run { - assertThat(dummyMode).isEqualTo(Some(false)) - assertThat(healthCheckApiPort).isEqualTo(Some(5000)) - assertThat(idleTimeoutSec).isEqualTo(Some(1200)) - assertThat(listenPort).isEqualTo(Some(6000)) - assertThat(maximumPayloadSizeBytes).isEqualTo(Some(512000)) - } - } - } - } -}) \ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt new file mode 100644 index 00000000..dbe757c4 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError +import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure +import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess +import java.io.File + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +object ArgVesHvConfigurationTest : Spek({ + lateinit var cut: ArgHvVesConfiguration + val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json") + + beforeEachTest { + cut = ArgHvVesConfiguration() + } + + describe("parsing arguments") { + given("all parameters are present in the long form") { + lateinit var result: File + + beforeEachTest { + result = cut.parseExpectingSuccess( + "--configuration-file", configFilePath + ) + } + + it("should read proper configuration file") { + assertThat(result.exists()).isTrue() + } + } + + describe("required parameter is absent") { + on("missing configuration file path") { + it("should throw exception") { + assertThat( + cut.parseExpectingFailure( + "--non-existing-option", "" + ) + ).isInstanceOf(WrongArgumentError::class.java) + } + } + } + } +}) \ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt new file mode 100644 index 00000000..12396d23 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -0,0 +1,138 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.time.Duration + +internal object ConfigurationValidatorTest : Spek({ + describe("ConfigurationValidator") { + val cut = ConfigurationValidator() + + describe("validating partial configuration with missing fields") { + val config = PartialConfiguration( + Some(PartialServerConfig(listenPort = Some(1))) + ) + + it("should return ValidationError") { + val result = cut.validate(config) + assertThat(result.isLeft()).isTrue() + } + } + + describe("validating configuration with empty log level") { + val config = PartialConfiguration( + Some(PartialServerConfig( + Some(1), + Some(2), + Some(3) + )), + Some(PartialCbsConfig( + Some(5), + Some(3) + )), + Some(PartialSecurityConfig( + Some(mock()) + )), + Some(PartialCollectorConfig( + Some(true), + Some(4), + Some(emptyList()), + Some(routing { }.build()) + )), + None + ) + + it("should use default log level") { + val result = cut.validate(config) + result.fold( + { + fail("Configuration should have been created successfully") + }, + { + assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) + } + ) + } + } + + describe("validating complete configuration") { + val idleTimeoutSec = 10 + val firstReqDelaySec = 10 + val securityKeys = Some(mock()) + val routing = routing { }.build() + + val config = PartialConfiguration( + Some(PartialServerConfig( + Some(1), + Some(idleTimeoutSec), + Some(2) + )), + Some(PartialCbsConfig( + Some(firstReqDelaySec), + Some(3) + )), + Some(PartialSecurityConfig( + securityKeys + )), + Some(PartialCollectorConfig( + Some(true), + Some(4), + Some(emptyList()), + Some(routing) + )), + Some(LogLevel.INFO) + ) + + it("should create valid configuration") { + val result = cut.validate(config) + result.fold( + { + fail("Configuration should have been created successfully") + }, + { + assertThat(it.server.idleTimeout) + .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong())) + + assertThat(it.security.keys) + .isEqualTo(securityKeys) + + assertThat(it.cbs.firstRequestDelay) + .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong())) + + assertThat(it.collector.routing) + .isEqualTo(routing) + } + ) + } + } + } +}) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt new file mode 100644 index 00000000..ab99f13c --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt @@ -0,0 +1,153 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Some +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.io.StringReader +import java.net.InetSocketAddress + +/** + * @author Pawel Biniek + * @since February 2019 + */ +internal object FileConfigurationReaderTest : Spek({ + describe("A configuration loader utility") { + val cut = FileConfigurationReader() + + describe("partial configuration loading") { + it("parses enumerations") { + val input = """{"logLevel":"ERROR"}""" + + val config = cut.loadConfig(StringReader(input)) + assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) + } + + it("parses simple structure") { + val input = """{ + "server" : { + "healthCheckApiPort" : 12002, + "listenPort" : 12003 + } + } + """.trimIndent() + val config = cut.loadConfig(StringReader(input)) + assertThat(config.server.nonEmpty()).isTrue() + assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003)) + } + + it("parses ip address") { + val input = """{ "collector" : { + "kafkaServers": [ + "192.168.255.1:5005", + "192.168.255.26:5006" + ] + } + }""" + + val config = cut.loadConfig(StringReader(input)) + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + assertThat(collector.kafkaServers.nonEmpty()).isTrue() + val addresses = collector.kafkaServers.orNull() as List + assertThat(addresses) + .isEqualTo(listOf( + InetSocketAddress("192.168.255.1", 5005), + InetSocketAddress("192.168.255.26", 5006) + )) + } + + it("parses routing array with RoutingAdapter") { + val input = """{ + "collector" : { + "routing" : [ + { + "fromDomain": "perf3gpp", + "toTopic": "HV_VES_PERF3GPP" + } + ] + } + }""".trimIndent() + val config = cut.loadConfig(StringReader(input)) + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + assertThat(collector.routing.nonEmpty()).isTrue() + val routing = collector.routing.orNull() as Routing + routing.run { + assertThat(routes.size).isEqualTo(1) + assertThat(routes[0].domain).isEqualTo("perf3gpp") + assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP") + } + } + + it("parses invalid log level string to empty option") { + val input = """{ + "logLevel": something + }""".trimMargin() + val config = cut.loadConfig(input.reader()) + + assertThat(config.logLevel.isEmpty()) + } + } + + describe("complete file loading") { + it("loads actual file") { + val config = cut.loadConfig( + javaClass.resourceAsStream("/sampleConfig.json")) + + assertThat(config).isNotNull + assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR)) + + assertThat(config.security.nonEmpty()).isTrue() + val security = config.security.orNull() as PartialSecurityConfig + assertThat(security.keys.nonEmpty()).isTrue() + + assertThat(config.cbs.nonEmpty()).isTrue() + val cbs = config.cbs.orNull() as PartialCbsConfig + assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7)) + assertThat(cbs.requestIntervalSec).isEqualTo(Some(900)) + + assertThat(config.collector.nonEmpty()).isTrue() + val collector = config.collector.orNull() as PartialCollectorConfig + collector.run { + assertThat(dummyMode).isEqualTo(Some(false)) + assertThat(maxRequestSizeBytes).isEqualTo(Some(512000)) + assertThat(kafkaServers.nonEmpty()).isTrue() + assertThat(routing.nonEmpty()).isTrue() + } + + assertThat(config.server.nonEmpty()).isTrue() + val server = config.server.orNull() as PartialServerConfig + server.run { + assertThat(idleTimeoutSec).isEqualTo(Some(1200)) + assertThat(listenPort).isEqualTo(Some(6000)) + assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000)) + } + } + } + } +}) + diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json index b64df05a..b49085e8 100644 --- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json +++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json @@ -1,16 +1,16 @@ { - "server" : { - "healthCheckApiPort" : 5000, - "listenPort" : 6000, - "idleTimeoutSec" : 1200, - "maximumPayloadSizeBytes" : 512000, - "dummyMode" : false + "logLevel": "ERROR", + "server": { + "healthCheckApiPort": 5000, + "listenPort": 6000, + "idleTimeoutSec": 1200, + "maxPayloadSizeBytes": 512000 }, - "cbs" : { + "cbs": { "firstRequestDelaySec": 7, "requestIntervalSec": 900 }, - "security" : { + "security": { "sslDisable": false, "keys": { "keyStoreFile": "test.ks.pkcs12", @@ -19,7 +19,9 @@ "trustStorePassword": "changeMeToo" } }, - "kafka" : { + "collector": { + "dummyMode": false, + "maxRequestSizeBytes": 512000, "kafkaServers": [ "192.168.255.1:5005", "192.168.255.1:5006" @@ -30,6 +32,5 @@ "toTopic": "HV_VES_PERF3GPP" } ] - }, - "logLevel" : "ERROR" + } } \ No newline at end of file diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 84310802..782d2324 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux + operator fun invoke(): Flux } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3ea14385..c08df748 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.factory import arrow.core.Option -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, + private val maxPayloadSizeBytes: Int, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config: AtomicReference = AtomicReference() + val config = AtomicReference() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), + router = Router(routing, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 58a8599a..6c4e4671 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory /** @@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + + private val sslFactory = SslContextFactory() + + fun createNettyTcpServer(serverConfig: ServerConfiguration, + securityConfig: SecurityConfiguration, collectorProvider: CollectorProvider, - metrics: Metrics): Server = - NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics) + metrics: Metrics + ): Server = NettyTcpServer( + serverConfig, + sslFactory.createServerContext(securityConfig), + collectorProvider, + metrics + ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index a853839a..c362020e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(dummyMode: Boolean, - kafkaConfig: KafkaConfiguration): SinkProvider = - if (dummyMode) + fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = + if (config.dummyMode) LoggingSinkProvider() else - KafkaSinkProvider(kafkaConfig) + KafkaSinkProvider(config) - fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - configurationProviderParams) + config) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 51b6d4f0..754a2efc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog @@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono ) : ConfigurationProvider { - constructor(cbsClientMono: Mono, params: ConfigurationProviderParams) : this( + constructor(cbsClientMono: Mono, params: CbsConfiguration) : this( cbsClientMono, params.firstRequestDelay, params.requestInterval, @@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = + override fun invoke(): Flux = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux = cbsClient .updates(RequestDiagnosticContext.create(), firstRequestDelay, requestInterval) @@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono) : SinkProvider { - constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) @@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor( private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - private fun constructKafkaSender(config: KafkaConfiguration) = + + private fun constructKafkaSender(config: CollectorConfiguration) = KafkaSender.create(constructSenderOptions(config)) - private fun constructSenderOptions(config: KafkaConfiguration) = + private fun constructSenderOptions(config: CollectorConfiguration) = SenderOptions.create() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config)) + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) + .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) + .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) @@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor( .producerProperty(ACKS_CONFIG, "1") .stopOnError(false) - private fun maxRequestSize(config: KafkaConfiguration) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt() + private fun maxRequestSize(maxRequestSizeBytes: Int) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - private fun bufferMemory(config: KafkaConfiguration) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes) + private fun bufferMemory(maxRequestSizeBytes: Int) = + max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 123956ad..fab96560 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.core.Option import arrow.core.getOrElse import arrow.effects.IO +import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics @@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -41,6 +42,7 @@ import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer import java.net.InetAddress +import java.net.InetSocketAddress import java.time.Duration @@ -48,14 +50,14 @@ import java.time.Duration * @author Piotr Jaszczyk * @since May 2018 */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: SslContextFactory, +internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, + private val sslContext: Option, private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { override fun start(): IO = IO { TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } .configureSsl() .handle(this::handleConnection) .doOnUnbound { @@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun TcpServer.configureSsl() = - sslContextFactory - .createServerContext(serverConfig.securityConfiguration) - .map { sslContext -> + sslContext + .map { serverContext -> logger.info { "Collector configured with SSL enabled" } - this.secure { b -> b.sslContext(sslContext) } + this.secure { it.sslContext(serverContext) } }.getOrElse { logger.info { "Collector configured with SSL disabled" } this @@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, nettyInbound: NettyInbound): (Collector) -> Mono = { collector -> withConnectionFrom(nettyInbound) { connection -> connection - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { collector.handleConnection(nettyInbound.createDataStream()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index 21aaa129..f830f2c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val route1 = it.routing.routes[0] + val route1 = it.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") .isEqualTo(route1.domain) @@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({ .describedAs("target topic 1") .isEqualTo(route1.targetTopic) - val route2 = it.routing.routes[1] + val route2 = it.routes[1] assertThat(HEARTBEAT.domainName) .describedAs("routed domain 2") .isEqualTo(route2.domain) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 068476ad..1e3f2e7a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass import reactor.kafka.sender.KafkaSender @@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender internal object KafkaSinkProviderTest : Spek({ describe("non functional requirements") { given("sample configuration") { - val config = KafkaConfiguration("localhost:9090", - 1024 * 1024) + val config = CollectorConfiguration( + dummyMode = false, + maxRequestSizeBytes = 1024 * 1024, + kafkaServers = "localhost:9090", + routing = routing { }.build()) + val cut = KafkaSinkProvider(config) on("sample clients") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index aaa3ee3b..bd056d4d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicConfiguration) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index dc5fe60b..ece42285 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index d97541ba..e84e9486 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink +import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk * @since May 2018 */ -class Sut(sink: Sink = StoringSink()): AutoCloseable { +class Sut(sink: Sink = StoringSink()) : AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index ed46b119..17f6ce32 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.* - +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload import reactor.core.publisher.Flux import java.time.Duration @@ -149,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -210,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -317,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -346,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index c7e12bbd..1ad2b0e3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,12 +20,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT - +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) - -val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } - }.build() -) +val basicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() -val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) +val twoDomainsToOneTopicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(HEARTBEAT.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(MEASUREMENT.domainName) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } +}.build() + + +val configurationWithDifferentRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(ALTERNATE_PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() + +val emptyRouting = routing { }.build() -val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - }.build() -) class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor = UnicastProcessor.create() + private val configStream: FluxProcessor = UnicastProcessor.create() - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(collectorConfiguration) + configStream.onNext(routing) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index dc9be16b..f6d1eab7 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -20,9 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config import arrow.core.Option -import arrow.core.fix -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration @@ -34,6 +31,7 @@ import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYL import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import java.net.InetSocketAddress class ArgDcaeAppSimConfiguration : ArgBasedConfiguration(DefaultParser()) { @@ -45,7 +43,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration = - Option.monad().binding { + binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() @@ -62,5 +60,5 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration) = - ConfigurationModule() - .createConfigurationFromCommandLine(args) - .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startAndAwaitServers) - .unsafeRunEitherSync( - { ex -> - logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } - ExitFailure(1) - }, - { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } - ) +private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv" +private val logger = Logger("$VES_HV_PACKAGE.main") -private fun startAndAwaitServers(config: ServerConfiguration) = - IO.monad().binding { - Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } +private val hvVesServer = AtomicReference() - val healthCheckServerHandle = HealthCheckServer.start(config).bind() - val hvVesHandle = VesServer.start(config).bind() +fun main(args: Array) { + HealthCheckServer.start() + ConfigurationModule() + .hvVesConfigurationUpdates(args) + .publishOn(Schedulers.single(Schedulers.elastic())) + .doOnNext(::startServer) + .doOnError(::logServerStartFailed) + .neverComplete() // TODO: remove after merging configuration stream with cbs + .block() +} + +private fun startServer(config: HvVesConfiguration) { + stopRunningServer() + Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) + logger.info { "Using configuration: $config" } + + VesServer.start(config).let { + registerShutdownHook { shutdownGracefully(it) } + hvVesServer.set(it) + } +} - registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) - hvVesHandle.await().bind() - }.fix() +private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync() -internal fun closeServers(vararg handles: ServerHandle, - healthState: HealthState = HealthState.INSTANCE) = { +internal fun shutdownGracefully(serverHandle: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) { logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).unsafeRunSync() + serverHandle.close().unsafeRunSync() logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } } + +private fun logServerStartFailed(ex: Throwable) = + logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } + diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index 15472b5e..bc284d08 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -19,25 +19,38 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.net.InetSocketAddress /** * @author Piotr Jaszczyk * @since August 2018 */ -object HealthCheckServer : ServerStarter() { - override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() +object HealthCheckServer { - private fun createHealthCheckServer(config: ServerConfiguration) = + private const val DEFAULT_HEALTHCHECK_PORT = 6060 + private val logger = Logger(HealthCheckServer::class) + + fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) = + createHealthCheckServer(port) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createHealthCheckServer(listenPort: Int) = HealthCheckApiServer( HealthState.INSTANCE, MicrometerMetrics.INSTANCE.metricsProvider, - config.healthCheckApiListenAddress) + InetSocketAddress(listenPort)) - override fun serverStartedMessage(handle: ServerHandle) = - "Health check server is up and listening on ${handle.host}:${handle.port}" + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "Health check server is up and listening on ${handle.host}:${handle.port}" + } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt deleted file mode 100644 index 74a66324..00000000 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main.servers - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.then -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -/** - * @author Piotr Jaszczyk - * @since August 2018 - */ -abstract class ServerStarter { - fun start(config: ServerConfiguration): IO = - startServer(config) - .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } } - - protected abstract fun startServer(config: ServerConfiguration): IO - protected abstract fun serverStartedMessage(handle: ServerHandle): String - - companion object { - private val logger = Logger(ServerStarter::class) - } -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 0f5e45ec..d15dccef 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -19,33 +19,54 @@ */ package org.onap.dcae.collectors.veshv.main.servers -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk * @since August 2018 */ -object VesServer : ServerStarter() { - override fun startServer(config: ServerConfiguration): IO = createVesServer(config).start() - - private fun createVesServer(config: ServerConfiguration): Server { - val collectorProvider = CollectorFactory( - AdapterFactory.configurationProvider(config.configurationProviderParams), - AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), - MicrometerMetrics.INSTANCE, - config.maximumPayloadSizeBytes - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE) - } - - override fun serverStartedMessage(handle: ServerHandle) = - "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +object VesServer { + + private val logger = Logger(VesServer::class) + + fun start(config: HvVesConfiguration): ServerHandle = + createVesServer(config) + .start() + .then(::logServerStarted) + .unsafeRunSync() + + private fun createVesServer(config: HvVesConfiguration): Server = + initializeCollectorFactory(config) + .createVesHvCollectorProvider() + .let { collectorProvider -> + ServerFactory.createNettyTcpServer( + config.server, + config.security, + collectorProvider, + MicrometerMetrics.INSTANCE + ) + } + + private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = + CollectorFactory( + AdapterFactory.configurationProvider(config.cbs), + AdapterFactory.sinkCreatorFactory(config.collector), + MicrometerMetrics.INSTANCE, + config.server.maxPayloadSizeBytes + ) + + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" + } + } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index e18b0b10..d8de9f25 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -42,7 +42,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle internal object MainTest : Spek({ describe("closeServer shutdown hook") { given("server handles and health state") { - val handle: ServerHandle = mock() + val handle = mock() var closed = false val handleClose = IO { closed = true @@ -50,8 +50,8 @@ internal object MainTest : Spek({ whenever(handle.close()).thenReturn(handleClose) val healthState: HealthState = mock() - on("closeServers") { - closeServers(handle, healthState = healthState).invoke() + on("shutdownGracefully") { + shutdownGracefully(handle, healthState = healthState) it("should close all handles") { assertThat(closed).isTrue() diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt index fb211115..579eb84c 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt @@ -21,11 +21,9 @@ package org.onap.dcae.collectors.veshv.ssl.boundary import arrow.core.Option import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.nio.file.Path /** * @author Piotr Jaszczyk * @since May 2018 */ -data class SecurityConfiguration( - val keys: Option) +data class SecurityConfiguration(val keys: Option) diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt index 805d94d2..f72ddecb 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt @@ -30,7 +30,7 @@ import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory class SslContextFactory(private val sslFactory: SslFactory = SslFactory()) { fun createServerContext(secConfig: SecurityConfiguration): Option = secConfig.keys.map { sslFactory.createSecureServerContext(it) } + fun createClientContext(secConfig: SecurityConfiguration): Option = secConfig.keys.map { sslFactory.createSecureClientContext(it) } - } diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt index f8632350..822d84f1 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt @@ -51,7 +51,7 @@ fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> Securit private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE) -private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None) +private fun disabledSecurityConfiguration() = SecurityConfiguration(None) private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration { val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE) @@ -66,8 +66,7 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfigur .trustStorePassword(Passwords.fromString(tsPass)) .build() - return SecurityConfiguration(keys = Some(keys)) + return SecurityConfiguration(Some(keys)) } - private fun pathFromFile(file: String) = Paths.get(file) diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt new file mode 100644 index 00000000..24fedbbc --- /dev/null +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.utils + +import java.io.InputStreamReader + +fun Class.resourceAsStream(resourcePath: String): InputStreamReader = + InputStreamReader(getResourceAsStream(resourcePath)) + +fun Class.absoluteResourcePath(resourcePath: String): String = + getResource(resourcePath).path diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index bedc2fcd..d5b33b91 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -20,10 +20,20 @@ package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either +import arrow.core.ForOption import arrow.core.Option import arrow.core.Try +import arrow.core.fix import arrow.core.identity +import arrow.effects.ForIO +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monad.monad +import arrow.instances.option.monad.monad import arrow.syntax.collections.firstOption +import arrow.typeclasses.MonadContinuation +import arrow.typeclasses.binding +import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference /** @@ -31,12 +41,24 @@ import java.util.concurrent.atomic.AtomicReference * @since July 2018 */ +object OptionUtils { + fun binding(c: suspend MonadContinuation.() -> A) + : Option = Option.monad().binding(c).fix() +} + +object IOUtils { + fun binding(c: suspend MonadContinuation.() -> A) + : IO = IO.monad().binding(c).fix() +} + fun Either.flatten() = fold(::identity, ::identity) fun Either.rightOrThrow() = fold({ throw it }, ::identity) fun Either.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity) +fun Flux>.throwOnLeft(f: (A) -> Exception): Flux = map { it.rightOrThrow(f) } + fun AtomicReference.getOption() = Option.fromNullable(get()) fun Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option = @@ -57,3 +79,13 @@ fun Try.doOnFailure(action: (Throwable) -> Unit): Try = apply { action(exception) } } + +fun A.mapBinding(c: suspend MonadContinuation.(A) -> B) + : Option = let { OptionUtils.binding { c(it) } } + + + + + + + diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt new file mode 100644 index 00000000..aaa598d2 --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt @@ -0,0 +1,25 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +fun Flux.neverComplete(): Mono = then(Mono.never()).then() \ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt index 87aea41e..cc940907 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt @@ -19,10 +19,19 @@ */ package org.onap.dcae.collectors.veshv.utils +import java.util.concurrent.atomic.AtomicReference + /** * @author Piotr Jaszczyk * @since January 2019 */ -fun registerShutdownHook(job: () -> Unit) = - Runtime.getRuntime() - .addShutdownHook(Thread({ job() }, "GracefulShutdownThread")) + +private val currentShutdownHook = AtomicReference() + +fun registerShutdownHook(job: () -> Unit) { + val runtime = Runtime.getRuntime() + val newShutdownHook = Thread({ job() }, "GracefulShutdownThread") + currentShutdownHook.get()?.run(runtime::removeShutdownHook) + currentShutdownHook.set(newShutdownHook) + runtime.addShutdownHook(newShutdownHook) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 08c05e05..28cc0556 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -20,9 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import arrow.core.Option -import arrow.core.fix -import arrow.instances.option.monad.monad -import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration @@ -40,6 +37,7 @@ import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress @@ -62,7 +60,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration = - Option.monad().binding { + binding { val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() val vesPort = cmdLine.intValue(VES_HV_PORT).bind() @@ -86,7 +84,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration) = ArgXnfSimulatorConfiguration().parse(args) ) private fun startServers(config: SimulatorConfiguration): IO = - IO.monad().binding { + binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() @@ -79,5 +77,5 @@ private fun startServers(config: SimulatorConfiguration): IO = HealthState.INSTANCE.changeState(HealthDescription.IDLE) xnfApiServerHandler.await().bind() - }.fix() + } -- cgit 1.2.3-korg