aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--development/configuration/configuration.json34
-rw-r--r--development/docker-compose.yml14
-rw-r--r--development/logs/.gitignore1
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt37
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt169
-rw-r--r--sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt4
-rw-r--r--sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt7
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt22
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt)30
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt)10
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt)30
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt36
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt151
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt118
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt35
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt192
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt73
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt138
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt (renamed from sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt)78
-rw-r--r--sources/hv-collector-configuration/src/test/resources/sampleConfig.json23
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt34
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt19
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt13
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt14
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt27
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt29
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt88
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt8
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt72
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt27
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt44
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt57
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt6
-rw-r--r--sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt4
-rw-r--r--sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt2
-rw-r--r--sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt5
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt)17
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt32
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt)13
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt15
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt8
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt8
49 files changed, 954 insertions, 877 deletions
diff --git a/development/configuration/configuration.json b/development/configuration/configuration.json
new file mode 100644
index 00000000..8e55cf39
--- /dev/null
+++ b/development/configuration/configuration.json
@@ -0,0 +1,34 @@
+{
+ "logLevel": "DEBUG",
+ "server": {
+ "listenPort": 6061,
+ "idleTimeoutSec": 60,
+ "maxPayloadSizeBytes": 1048576
+ },
+ "cbs": {
+ "firstRequestDelaySec": 10,
+ "requestIntervalSec": 5
+ },
+ "security": {
+ "sslDisable": false,
+ "keys": {
+ "keyStoreFile": "/etc/ves-hv/ssl/server.p12",
+ "keyStorePassword": "onaponap",
+ "trustStoreFile": "/etc/ves-hv/ssl/trust.p12",
+ "trustStorePassword": "onaponap"
+ }
+ },
+ "collector": {
+ "dummyMode": false,
+ "maxRequestSizeBytes": 1048576,
+ "kafkaServers": [
+ "message-router-kafka:9092"
+ ],
+ "routing": [
+ {
+ "fromDomain": "perf3gpp",
+ "toTopic": "HV_VES_PERF3GPP"
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index abd55ab9..85500cbb 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -73,15 +73,9 @@ services:
ports:
- "6060:6060"
- "6061:6061/tcp"
- command: ["--listen-port", "6061",
- "--health-check-api-port", "6060",
- "--kafka-bootstrap-servers", "message-router-kafka:9092",
- "--key-store-password", "onaponap",
- "--trust-store-password", "onaponap",
- "--first-request-delay", "5",
- "--log-level", "DEBUG"]
+ command: ["--configuration-file /etc/ves-hv/configuration/configuration.json"]
environment:
- JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+ JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
CONSUL_HOST: "consul-server"
CONFIG_BINDING_SERVICE: "cbs"
HOSTNAME: "dcae-hv-ves-collector"
@@ -95,7 +89,8 @@ services:
- message-router-kafka
- config-binding-service
volumes:
- - ./ssl/:/etc/ves-hv/
+ - ./configuration/:/etc/ves-hv/configuration/
+ - ./ssl/:/etc/ves-hv/ssl/
- ./logs:/var/log/ONAP/dcae-hv-ves-collector
@@ -139,6 +134,7 @@ services:
#
# Monitoring
#
+
prometheus:
image: prom/prometheus
ports:
diff --git a/development/logs/.gitignore b/development/logs/.gitignore
index f935021a..1287e9bd 100644
--- a/development/logs/.gitignore
+++ b/development/logs/.gitignore
@@ -1 +1,2 @@
+**
!.gitignore
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
index 1e45c923..93d42f96 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
@@ -33,25 +33,24 @@ import java.nio.file.Paths
abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
abstract val cmdLineOptionsList: List<CommandLineOption>
- fun parse(args: Array<out String>): Either<WrongArgumentError, T> {
- val parseResult = Try {
- val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
- parser.parse(commandLineOptions, args)
- }
- return parseResult
- .toEither()
- .mapLeft { ex -> WrongArgumentError(ex, cmdLineOptionsList) }
- .map(this::getConfiguration)
- .flatMap {
- it.toEither {
- WrongArgumentError(
- message = "Unexpected error when parsing command line arguments",
- cmdLineOptionsList = cmdLineOptionsList)
- }
- }
- }
-
protected abstract fun getConfiguration(cmdLine: CommandLine): Option<T>
- protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI())
+ fun parse(args: Array<out String>): Either<WrongArgumentError, T> =
+ Try { parseArgumentsArray(args) }
+ .toEither()
+ .mapLeft { WrongArgumentError(it, cmdLineOptionsList) }
+ .map(this::getConfiguration)
+ .flatMap {
+ it.toEither {
+ WrongArgumentError(
+ message = "Unexpected error when parsing command line arguments",
+ cmdLineOptionsList = cmdLineOptionsList)
+ }
+ }
+
+ private fun parseArgumentsArray(args: Array<out String>) =
+ cmdLineOptionsList
+ .map { it.option }
+ .fold(Options(), Options::addOption)
+ .let { parser.parse(it, args) }
}
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
index 31849215..1c1a355b 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
@@ -24,78 +24,72 @@ import org.apache.commons.cli.Option
enum class CommandLineOption(val option: Option, val required: Boolean = false) {
HEALTH_CHECK_API_PORT(
- Option.builder("H")
- .longOpt("health-check-api-port")
- .hasArg()
- .desc("Health check rest api listen port")
- .build()
- ),
- LISTEN_PORT(
- Option.builder("p")
- .longOpt("listen-port")
- .hasArg()
- .desc("Listen port")
- .build(),
- required = true
+ Option.builder("H")
+ .longOpt("health-check-api-port")
+ .hasArg()
+ .desc("Health check rest api listen port")
+ .build()
),
- CONFIGURATION_FIRST_REQUEST_DELAY(
- Option.builder("d")
- .longOpt("first-request-delay")
- .hasArg()
- .desc("Delay of first request for configuration in seconds")
- .build()
+ CONFIGURATION_FILE(
+ Option.builder("c")
+ .longOpt("configuration-file")
+ .hasArg()
+ .desc("Json file containing HV-VES configuration")
+ .build(),
+ required = true
),
- CONFIGURATION_REQUEST_INTERVAL(
- Option.builder("I")
- .longOpt("request-interval")
- .hasArg()
- .desc("Interval of configuration requests in seconds")
- .build()
+ LISTEN_PORT(
+ Option.builder("p")
+ .longOpt("listen-port")
+ .hasArg()
+ .desc("Listen port")
+ .build(),
+ required = true
),
VES_HV_PORT(
- Option.builder("v")
- .longOpt("ves-port")
- .hasArg()
- .desc("VesHvCollector port")
- .build(),
- required = true
+ Option.builder("v")
+ .longOpt("ves-port")
+ .hasArg()
+ .desc("VesHvCollector port")
+ .build(),
+ required = true
),
VES_HV_HOST(
- Option.builder("h")
- .longOpt("ves-host")
- .hasArg()
- .desc("VesHvCollector host")
- .build(),
- required = true
+ Option.builder("h")
+ .longOpt("ves-host")
+ .hasArg()
+ .desc("VesHvCollector host")
+ .build(),
+ required = true
),
KAFKA_SERVERS(
- Option.builder("s")
- .longOpt("kafka-bootstrap-servers")
- .hasArg()
- .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
- .build(),
- required = true
+ Option.builder("s")
+ .longOpt("kafka-bootstrap-servers")
+ .hasArg()
+ .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
+ .build(),
+ required = true
),
KAFKA_TOPICS(
- Option.builder("f")
- .longOpt("kafka-topics")
- .hasArg()
- .desc("Comma-separated Kafka topics")
- .build(),
- required = true
+ Option.builder("f")
+ .longOpt("kafka-topics")
+ .hasArg()
+ .desc("Comma-separated Kafka topics")
+ .build(),
+ required = true
),
SSL_DISABLE(
- Option.builder("l")
- .longOpt("ssl-disable")
- .desc("Disable SSL encryption")
- .build()
+ Option.builder("l")
+ .longOpt("ssl-disable")
+ .desc("Disable SSL encryption")
+ .build()
),
KEY_STORE_FILE(
- Option.builder("k")
- .longOpt("key-store")
- .hasArg()
- .desc("Key store in PKCS12 format")
- .build()
+ Option.builder("k")
+ .longOpt("key-store")
+ .hasArg()
+ .desc("Key store in PKCS12 format")
+ .build()
),
KEY_STORE_PASSWORD(
Option.builder("kp")
@@ -105,54 +99,31 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
.build()
),
TRUST_STORE_FILE(
- Option.builder("t")
- .longOpt("trust-store")
- .hasArg()
- .desc("File with trusted certificate bundle in PKCS12 format")
- .build()
+ Option.builder("t")
+ .longOpt("trust-store")
+ .hasArg()
+ .desc("File with trusted certificate bundle in PKCS12 format")
+ .build()
),
TRUST_STORE_PASSWORD(
- Option.builder("tp")
- .longOpt("trust-store-password")
- .hasArg()
- .desc("Trust store password")
- .build()
- ),
- IDLE_TIMEOUT_SEC(
- Option.builder("i")
- .longOpt("idle-timeout-sec")
- .hasArg()
- .desc(
- """Idle timeout for remote hosts. After given time without any data exchange the
- |connection might be closed.""".trimMargin()
- )
- .build()
+ Option.builder("tp")
+ .longOpt("trust-store-password")
+ .hasArg()
+ .desc("Trust store password")
+ .build()
),
MAXIMUM_PAYLOAD_SIZE_BYTES(
- Option.builder("m")
- .longOpt("max-payload-size")
- .hasArg()
- .desc("Maximum supported payload size in bytes")
- .build()
- ),
- LOG_LEVEL(
- Option.builder("ll")
- .longOpt("log-level")
- .hasArg()
- .desc("Log level")
- .build()
- ),
- DUMMY_MODE(
- Option.builder("u")
- .longOpt("dummy")
- .desc("If present will start in dummy mode (dummy external services)")
- .build()
+ Option.builder("m")
+ .longOpt("max-payload-size")
+ .hasArg()
+ .desc("Maximum supported payload size in bytes")
+ .build()
);
fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
- option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
- "${prefix}_${mainPart}"
- }
+ option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
+ "${prefix}_${mainPart}"
+ }
companion object {
private const val DEFAULT_ENV_PREFIX = "VESHV"
diff --git a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
index c0fbcde6..48cac69a 100644
--- a/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
+++ b/sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
@@ -32,13 +32,13 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
* @since June 2018
*/
+val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
+
fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO {
err.printMessage()
err.printHelp(programName)
}.flatMap { ExitFailure(2).io() }
-val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
-
fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
longValue(cmdLineOpt).getOrElse { default }
diff --git a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
index 736710ff..6614e77f 100644
--- a/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
+++ b/sources/hv-collector-commandline/src/test/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOptionTest.kt
@@ -25,8 +25,7 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.*
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -49,13 +48,13 @@ class CommandLineOptionTest : Spek({
}
given("sample option without prefix") {
- val opt = DUMMY_MODE
+ val opt = SSL_DISABLE
on("calling environmentVariableName") {
val result = opt.environmentVariableName()
it("should return prefixed upper snake cased long option name") {
- assertThat(result).isEqualTo("VESHV_DUMMY")
+ assertThat(result).isEqualTo("VESHV_SSL_DISABLE")
}
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 874cc5b0..5b547a28 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -19,13 +19,25 @@
*/
package org.onap.dcae.collectors.veshv.config.api
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
+import org.onap.dcae.collectors.veshv.config.impl.ArgHvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
import reactor.core.publisher.Flux
class ConfigurationModule {
- fun createConfigurationFromCommandLine(args: Array<String>) =
- ArgVesHvConfiguration().parse(args)
- fun hvVesConfigurationUpdates(): Flux<ServerConfiguration> = Flux.never<ServerConfiguration>()
+ private val cmd = ArgHvVesConfiguration()
+ private val configReader = FileConfigurationReader()
+ private val configValidator = ConfigurationValidator()
+
+ fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+ Flux.just(cmd.parse(args))
+ .throwOnLeft { MissingArgumentException(it.message, it.cause) }
+ .map { configReader.loadConfig(it.reader()) }
+ .map { configValidator.validate(it) }
+ .throwOnLeft { ValidationException(it.message) }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
index 8fbc649b..566f2c08 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ServerConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
@@ -21,22 +21,34 @@ package org.onap.dcae.collectors.veshv.config.api.model
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.net.InetSocketAddress
import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
+data class HvVesConfiguration(
+ val server: ServerConfiguration,
+ val cbs: CbsConfiguration,
+ val security: SecurityConfiguration,
+ val collector: CollectorConfiguration,
+ val logLevel: LogLevel
+)
+
data class ServerConfiguration(
- val serverListenAddress: InetSocketAddress,
- val kafkaConfiguration: KafkaConfiguration,
- val configurationProviderParams: ConfigurationProviderParams,
- val securityConfiguration: SecurityConfiguration,
+ val listenPort: Int,
val idleTimeout: Duration,
- val healthCheckApiListenAddress: InetSocketAddress,
- val maximumPayloadSizeBytes: Int,
- val logLevel: LogLevel,
- val dummyMode: Boolean = false
+ val maxPayloadSizeBytes: Int
)
+data class CbsConfiguration(
+ val firstRequestDelay: Duration,
+ val requestInterval: Duration
+)
+
+data class CollectorConfiguration(
+ val maxRequestSizeBytes: Int,
+ val kafkaServers: String,
+ val routing: Routing,
+ val dummyMode: Boolean = false
+)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt
index 7999451e..2fc29829 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,8 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class CollectorConfiguration(val routing: Routing)
+class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause)
+
+class ValidationException(message: String) : RuntimeException(message)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index 847f35ad..aab8ecad 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ data class Routing(val routes: List<Route>) {
Option.fromNullable(routes.find { it.applies(commonHeader) })
}
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = {0}) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
@@ -40,24 +40,17 @@ data class Route(val domain: String, val targetTopic: String, val partitioning:
/*
-Configuration DSL
- */
+HvVesConfiguration DSL
+*/
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
- val conf = RoutingBuilder()
- conf.init()
- return conf
-}
+fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
class RoutingBuilder {
private val routes: MutableList<RouteBuilder> = mutableListOf()
- fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
- val rule = RouteBuilder()
- rule.init()
- routes.add(rule)
- return rule
- }
+ fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
+ .apply(init)
+ .also { routes.add(it) }
fun build() = Routing(routes.map { it.build() }.toList())
}
@@ -68,18 +61,17 @@ class RouteBuilder {
private lateinit var targetTopic: String
private lateinit var partitioning: (CommonEventHeader) -> Int
- fun fromDomain(domain: String) : RouteBuilder = apply {
+ fun fromDomain(domain: String): RouteBuilder = apply {
this.domain = domain
}
- fun toTopic(targetTopic: String) : RouteBuilder = apply {
+ fun toTopic(targetTopic: String): RouteBuilder = apply {
this.targetTopic = targetTopic
}
- fun withFixedPartitioning(num: Int = 0) : RouteBuilder = apply {
+ fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
partitioning = { num }
}
fun build() = Route(domain, targetTopic, partitioning)
-
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt
new file mode 100644
index 00000000..9587d5b0
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfiguration.kt
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Option
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FILE
+import org.onap.dcae.collectors.veshv.commandline.stringValue
+import java.io.File
+
+internal class ArgHvVesConfiguration : ArgBasedConfiguration<File>(DefaultParser()) {
+ override val cmdLineOptionsList = listOf(CONFIGURATION_FILE)
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<File> =
+ cmdLine.stringValue(CONFIGURATION_FILE).map(::File)
+
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt
deleted file mode 100644
index 08346d30..00000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgVesHvConfiguration.kt
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl
-
-import arrow.core.Option
-import arrow.core.fix
-import arrow.core.getOrElse
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
-import org.apache.commons.cli.CommandLine
-import org.apache.commons.cli.DefaultParser
-import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KAFKA_SERVERS
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_FILE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.KEY_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LOG_LEVEL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.SSL_DISABLE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_FILE
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.commandline.hasOption
-import org.onap.dcae.collectors.veshv.commandline.intValue
-import org.onap.dcae.collectors.veshv.commandline.longValue
-import org.onap.dcae.collectors.veshv.commandline.stringValue
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
-import java.time.Duration
-
-
-internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
- override val cmdLineOptionsList = listOf(
- KAFKA_SERVERS,
- HEALTH_CHECK_API_PORT,
- LISTEN_PORT,
- CONFIGURATION_FIRST_REQUEST_DELAY,
- CONFIGURATION_REQUEST_INTERVAL,
- SSL_DISABLE,
- KEY_STORE_FILE,
- KEY_STORE_PASSWORD,
- TRUST_STORE_FILE,
- TRUST_STORE_PASSWORD,
- IDLE_TIMEOUT_SEC,
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- DUMMY_MODE,
- LOG_LEVEL
- )
-
- override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
- Option.monad().binding {
- val healthCheckApiPort = cmdLine.intValue(
- HEALTH_CHECK_API_PORT,
- DefaultValues.HEALTH_CHECK_API_PORT
- )
- val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
- val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
- val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
- val maxPayloadSizeBytes = cmdLine.intValue(
- MAXIMUM_PAYLOAD_SIZE_BYTES,
- DefaultValues.MAX_PAYLOAD_SIZE_BYTES
- )
- val dummyMode = cmdLine.hasOption(DUMMY_MODE)
- val security = createSecurityConfiguration(cmdLine)
- .doOnFailure { ex ->
- logger.withError { log("Could not read security keys", ex) }
- }
- .toOption()
- .bind()
- val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
- val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
- ServerConfiguration(
- serverListenAddress = InetSocketAddress(listenPort),
- kafkaConfiguration = KafkaConfiguration(kafkaServers, maxPayloadSizeBytes),
- healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
- configurationProviderParams = configurationProviderParams,
- securityConfiguration = security,
- idleTimeout = Duration.ofSeconds(idleTimeoutSec),
- maximumPayloadSizeBytes = maxPayloadSizeBytes,
- dummyMode = dummyMode,
- logLevel = determineLogLevel(logLevel)
- )
- }.fix()
-
- private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
- Option.monad().binding {
- val firstRequestDelay = cmdLine.longValue(
- CONFIGURATION_FIRST_REQUEST_DELAY,
- DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY
- )
- val requestInterval = cmdLine.longValue(
- CONFIGURATION_REQUEST_INTERVAL,
- DefaultValues.CONFIGURATION_REQUEST_INTERVAL
- )
- ConfigurationProviderParams(
- Duration.ofSeconds(firstRequestDelay),
- Duration.ofSeconds(requestInterval)
- )
- }.fix()
-
- private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel)
- .getOrElse {
- logger.warn {
- "Failed to parse $logLevel as $LOG_LEVEL command line. " +
- "Using default log level (${DefaultValues.LOG_LEVEL})"
- }
- LogLevel.valueOf(DefaultValues.LOG_LEVEL)
- }
-
-
- internal object DefaultValues {
- const val HEALTH_CHECK_API_PORT = 6060
- const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L
- const val CONFIGURATION_REQUEST_INTERVAL = 5L
- const val IDLE_TIMEOUT_SEC = 60L
- const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
- val LOG_LEVEL = LogLevel.INFO.name
- }
-
- companion object {
- private val logger = Logger(ArgVesHvConfiguration::class)
- }
-}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
new file mode 100644
index 00000000..6c74c33f
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Either
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
+import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since March 2019
+ */
+internal class ConfigurationValidator {
+
+ fun validate(partialConfig: PartialConfiguration)
+ : Either<ValidationError, HvVesConfiguration> = binding {
+ val logLevel = determineLogLevel(partialConfig.logLevel)
+
+ val serverConfiguration = partialConfig.server.bind()
+ .let { createServerConfiguration(it).bind() }
+
+ val cbsConfiguration = partialConfig.cbs.bind()
+ .let { createCbsConfiguration(it).bind() }
+
+ val securityConfiguration = partialConfig.security.bind()
+ .let { createSecurityConfiguration(it).bind() }
+
+ val collectorConfiguration = partialConfig.collector.bind()
+ .let { createCollectorConfig(it).bind() }
+
+ HvVesConfiguration(
+ serverConfiguration,
+ cbsConfiguration,
+ securityConfiguration,
+ collectorConfiguration,
+ logLevel
+ )
+ }.toEither { ValidationError("Some required configuration options are missing") }
+
+ private fun determineLogLevel(logLevel: Option<LogLevel>) =
+ logLevel.getOrElse {
+ logger.warn {
+ "Missing or invalid \"logLevel\" field. " +
+ "Using default log level ($DEFAULT_LOG_LEVEL)"
+ }
+ DEFAULT_LOG_LEVEL
+ }
+
+ private fun createServerConfiguration(partial: PartialServerConfig) =
+ partial.mapBinding {
+ ServerConfiguration(
+ it.listenPort.bind(),
+ Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()),
+ it.maxPayloadSizeBytes.bind()
+ )
+ }
+
+ private fun createCbsConfiguration(partial: PartialCbsConfig) =
+ partial.mapBinding {
+ CbsConfiguration(
+ Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()),
+ Duration.ofSeconds(it.requestIntervalSec.bind().toLong())
+ )
+ }
+
+ private fun createSecurityConfiguration(partial: PartialSecurityConfig) =
+ partial.keys.map { SecurityConfiguration(Some(it)) }
+
+ private fun createCollectorConfig(partial: PartialCollectorConfig) =
+ partial.mapBinding {
+ CollectorConfiguration(
+ it.maxRequestSizeBytes.bind(),
+ toKafkaServersString(it.kafkaServers.bind()),
+ it.routing.bind(),
+ it.dummyMode.bind()
+ )
+ }
+
+ private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
+ kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
+
+ companion object {
+ val DEFAULT_LOG_LEVEL = LogLevel.INFO
+ private val logger = Logger(ConfigurationValidator::class)
+ }
+}
+
+data class ValidationError(val message: String, val cause: Option<Throwable> = None)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
index 8d9cca73..3e6df3e0 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
+import arrow.core.None
import arrow.core.Option
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
@@ -30,31 +31,29 @@ import java.net.InetSocketAddress
* @since February 2019
*/
internal data class PartialConfiguration(
- val server : Option<PartialServerConfig>,
- val cbs : Option<PartialCbsConfig>,
- val security : Option<PartialSecurityConfig>,
- val kafka : Option<PartialKafkaConfig>,
- val logLevel : Option<LogLevel>
+ val server: Option<PartialServerConfig> = None,
+ val cbs: Option<PartialCbsConfig> = None,
+ val security: Option<PartialSecurityConfig> = None,
+ val collector: Option<PartialCollectorConfig> = None,
+ val logLevel: Option<LogLevel> = None
)
internal data class PartialServerConfig(
- val healthCheckApiPort : Option<Int>,
- val listenPort : Option<Int>,
- val idleTimeoutSec : Option<Int>,
- val maximumPayloadSizeBytes : Option<Int>,
- val dummyMode : Option<Boolean>
+ val listenPort: Option<Int> = None,
+ val idleTimeoutSec: Option<Int> = None,
+ val maxPayloadSizeBytes: Option<Int> = None
)
internal data class PartialCbsConfig(
- val firstRequestDelaySec : Option<Int>,
- val requestIntervalSec : Option<Int>
+ val firstRequestDelaySec: Option<Int> = None,
+ val requestIntervalSec: Option<Int> = None
)
-internal data class PartialSecurityConfig(
- val sslDisable : Option<Boolean>,
- val keys : Option<SecurityKeys>)
+internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
-internal data class PartialKafkaConfig(
- val kafkaServers : Option<Array<InetSocketAddress>>,
- val routing : Option<Routing>
+internal data class PartialCollectorConfig(
+ val dummyMode: Option<Boolean> = None,
+ val maxRequestSizeBytes: Option<Int> = None,
+ val kafkaServers: Option<List<InetSocketAddress>> = None,
+ val routing: Option<Routing> = None
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt
deleted file mode 100644
index 0f8d8d0e..00000000
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ArgVesHvConfigurationTest.kt
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.api
-
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.config.impl.ArgVesHvConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
-import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.time.Duration
-import kotlin.test.assertNotNull
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object ArgVesHvConfigurationTest : Spek({
- lateinit var cut: ArgVesHvConfiguration
- val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
- val healthCheckApiPort = "6070"
- val firstRequestDelay = "10"
- val requestInterval = "5"
- val listenPort = "6969"
- val keyStorePassword = "kspass"
- val trustStorePassword = "tspass"
- val logLevel = LogLevel.DEBUG.name
-
- beforeEachTest {
- cut = ArgVesHvConfiguration()
- }
-
- describe("parsing arguments") {
- given("all parameters are present in the long form") {
- lateinit var result: ServerConfiguration
-
- beforeEachTest {
- result = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword,
- "--log-level", logLevel
- )
- }
-
- it("should set proper kafka bootstrap servers") {
- assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
- }
-
- it("should set proper listen port") {
- assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
- }
-
-
- it("should set default listen address") {
- assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
- }
-
- it("should set proper health check api port") {
- assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt())
- }
-
- it("should set default health check api address") {
- assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
- }
-
- it("should set proper first request delay") {
- assertThat(result.configurationProviderParams.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
- }
-
- it("should set proper request interval") {
- assertThat(result.configurationProviderParams.requestInterval)
- .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
- }
-
- it("should set proper security configuration") {
- assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
-
- val keys = result.securityConfiguration.keys.orNull() as SecurityKeys
- assertNotNull(keys.keyStore())
- assertNotNull(keys.trustStore())
- keys.keyStorePassword().useChecked {
- assertThat(it).isEqualTo(keyStorePassword.toCharArray())
-
- }
- keys.trustStorePassword().useChecked {
- assertThat(it).isEqualTo(trustStorePassword.toCharArray())
- }
- }
-
- it("should set proper log level") {
- assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG)
- }
- }
-
- describe("required parameter is absent") {
- on("missing listen port") {
- it("should throw exception") {
- assertThat(
- cut.parseExpectingFailure(
- "--ssl-disable",
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval
- )
- ).isInstanceOf(WrongArgumentError::class.java)
- }
- }
- on("missing configuration url") {
- it("should throw exception") {
- assertThat(
- cut.parseExpectingFailure(
- "--listen-port", listenPort,
- "--ssl-disable",
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval
- )
- ).isInstanceOf(WrongArgumentError::class.java)
- }
- }
- }
-
- describe("correct log level not provided") {
- on("missing log level") {
- it("should set default INFO value") {
- val config = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword
- )
-
- assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
- }
- }
-
- on("incorrect log level") {
- it("should set default INFO value") {
- val config = cut.parseExpectingSuccess(
- "--kafka-bootstrap-servers", kafkaBootstrapServers,
- "--health-check-api-port", healthCheckApiPort,
- "--listen-port", listenPort,
- "--first-request-delay", firstRequestDelay,
- "--request-interval", requestInterval,
- "--key-store", "/tmp/keys.p12",
- "--trust-store", "/tmp/trust.p12",
- "--key-store-password", keyStorePassword,
- "--trust-store-password", trustStorePassword,
- "--log-level", "1"
- )
-
- assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
- }
- }
- }
- }
-})
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt
new file mode 100644
index 00000000..dbe757c4
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ArgHvVesConfigurationTest.kt
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
+import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
+import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
+import java.io.File
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ArgVesHvConfigurationTest : Spek({
+ lateinit var cut: ArgHvVesConfiguration
+ val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json")
+
+ beforeEachTest {
+ cut = ArgHvVesConfiguration()
+ }
+
+ describe("parsing arguments") {
+ given("all parameters are present in the long form") {
+ lateinit var result: File
+
+ beforeEachTest {
+ result = cut.parseExpectingSuccess(
+ "--configuration-file", configFilePath
+ )
+ }
+
+ it("should read proper configuration file") {
+ assertThat(result.exists()).isTrue()
+ }
+ }
+
+ describe("required parameter is absent") {
+ on("missing configuration file path") {
+ it("should throw exception") {
+ assertThat(
+ cut.parseExpectingFailure(
+ "--non-existing-option", ""
+ )
+ ).isInstanceOf(WrongArgumentError::class.java)
+ }
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
new file mode 100644
index 00000000..12396d23
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -0,0 +1,138 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+import java.time.Duration
+
+internal object ConfigurationValidatorTest : Spek({
+ describe("ConfigurationValidator") {
+ val cut = ConfigurationValidator()
+
+ describe("validating partial configuration with missing fields") {
+ val config = PartialConfiguration(
+ Some(PartialServerConfig(listenPort = Some(1)))
+ )
+
+ it("should return ValidationError") {
+ val result = cut.validate(config)
+ assertThat(result.isLeft()).isTrue()
+ }
+ }
+
+ describe("validating configuration with empty log level") {
+ val config = PartialConfiguration(
+ Some(PartialServerConfig(
+ Some(1),
+ Some(2),
+ Some(3)
+ )),
+ Some(PartialCbsConfig(
+ Some(5),
+ Some(3)
+ )),
+ Some(PartialSecurityConfig(
+ Some(mock())
+ )),
+ Some(PartialCollectorConfig(
+ Some(true),
+ Some(4),
+ Some(emptyList()),
+ Some(routing { }.build())
+ )),
+ None
+ )
+
+ it("should use default log level") {
+ val result = cut.validate(config)
+ result.fold(
+ {
+ fail("Configuration should have been created successfully")
+ },
+ {
+ assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
+ }
+ )
+ }
+ }
+
+ describe("validating complete configuration") {
+ val idleTimeoutSec = 10
+ val firstReqDelaySec = 10
+ val securityKeys = Some(mock<SecurityKeys>())
+ val routing = routing { }.build()
+
+ val config = PartialConfiguration(
+ Some(PartialServerConfig(
+ Some(1),
+ Some(idleTimeoutSec),
+ Some(2)
+ )),
+ Some(PartialCbsConfig(
+ Some(firstReqDelaySec),
+ Some(3)
+ )),
+ Some(PartialSecurityConfig(
+ securityKeys
+ )),
+ Some(PartialCollectorConfig(
+ Some(true),
+ Some(4),
+ Some(emptyList()),
+ Some(routing)
+ )),
+ Some(LogLevel.INFO)
+ )
+
+ it("should create valid configuration") {
+ val result = cut.validate(config)
+ result.fold(
+ {
+ fail("Configuration should have been created successfully")
+ },
+ {
+ assertThat(it.server.idleTimeout)
+ .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+
+ assertThat(it.security.keys)
+ .isEqualTo(securityKeys)
+
+ assertThat(it.cbs.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+
+ assertThat(it.collector.routing)
+ .isEqualTo(routing)
+ }
+ )
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
index 55da7d02..ab99f13c 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/FileConfigurationReaderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
@@ -17,21 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.config.api
+package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Some
-import org.jetbrains.spek.api.Spek
import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
-import org.onap.dcae.collectors.veshv.config.impl.PartialCbsConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialKafkaConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialSecurityConfig
-import org.onap.dcae.collectors.veshv.config.impl.PartialServerConfig
+import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import java.io.InputStreamReader
import java.io.StringReader
import java.net.InetSocketAddress
@@ -41,12 +36,13 @@ import java.net.InetSocketAddress
*/
internal object FileConfigurationReaderTest : Spek({
describe("A configuration loader utility") {
+ val cut = FileConfigurationReader()
describe("partial configuration loading") {
it("parses enumerations") {
val input = """{"logLevel":"ERROR"}"""
- val config = FileConfigurationReader().loadConfig(StringReader(input))
+ val config = cut.loadConfig(StringReader(input))
assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
}
@@ -58,14 +54,13 @@ internal object FileConfigurationReaderTest : Spek({
}
}
""".trimIndent()
- val config = FileConfigurationReader().loadConfig(StringReader(input))
+ val config = cut.loadConfig(StringReader(input))
assertThat(config.server.nonEmpty()).isTrue()
- assertThat(config.server.orNull()?.healthCheckApiPort).isEqualTo(Some(12002))
assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003))
}
it("parses ip address") {
- val input = """{ "kafka" : {
+ val input = """{ "collector" : {
"kafkaServers": [
"192.168.255.1:5005",
"192.168.255.26:5006"
@@ -73,13 +68,13 @@ internal object FileConfigurationReaderTest : Spek({
}
}"""
- val config = FileConfigurationReader().loadConfig(StringReader(input))
- assertThat(config.kafka.nonEmpty()).isTrue()
- val kafka = config.kafka.orNull() as PartialKafkaConfig
- assertThat(kafka.kafkaServers.nonEmpty()).isTrue()
- val addresses = kafka.kafkaServers.orNull() as Array<InetSocketAddress>
+ val config = cut.loadConfig(StringReader(input))
+ assertThat(config.collector.nonEmpty()).isTrue()
+ val collector = config.collector.orNull() as PartialCollectorConfig
+ assertThat(collector.kafkaServers.nonEmpty()).isTrue()
+ val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress>
assertThat(addresses)
- .isEqualTo(arrayOf(
+ .isEqualTo(listOf(
InetSocketAddress("192.168.255.1", 5005),
InetSocketAddress("192.168.255.26", 5006)
))
@@ -87,7 +82,7 @@ internal object FileConfigurationReaderTest : Spek({
it("parses routing array with RoutingAdapter") {
val input = """{
- "kafka" : {
+ "collector" : {
"routing" : [
{
"fromDomain": "perf3gpp",
@@ -96,30 +91,38 @@ internal object FileConfigurationReaderTest : Spek({
]
}
}""".trimIndent()
- val config = FileConfigurationReader().loadConfig(StringReader(input))
- assertThat(config.kafka.nonEmpty()).isTrue()
- val kafka = config.kafka.orNull() as PartialKafkaConfig
- assertThat(kafka.routing.nonEmpty()).isTrue()
- val routing = kafka.routing.orNull() as Routing
+ val config = cut.loadConfig(StringReader(input))
+ assertThat(config.collector.nonEmpty()).isTrue()
+ val collector = config.collector.orNull() as PartialCollectorConfig
+ assertThat(collector.routing.nonEmpty()).isTrue()
+ val routing = collector.routing.orNull() as Routing
routing.run {
assertThat(routes.size).isEqualTo(1)
assertThat(routes[0].domain).isEqualTo("perf3gpp")
assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP")
}
}
+
+ it("parses invalid log level string to empty option") {
+ val input = """{
+ "logLevel": something
+ }""".trimMargin()
+ val config = cut.loadConfig(input.reader())
+
+ assertThat(config.logLevel.isEmpty())
+ }
}
describe("complete file loading") {
it("loads actual file") {
- val config = FileConfigurationReader().loadConfig(
- InputStreamReader(
- FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")))
+ val config = cut.loadConfig(
+ javaClass.resourceAsStream("/sampleConfig.json"))
+
assertThat(config).isNotNull
assertThat(config.logLevel).isEqualTo(Some(LogLevel.ERROR))
assertThat(config.security.nonEmpty()).isTrue()
val security = config.security.orNull() as PartialSecurityConfig
- assertThat(security.sslDisable.orNull()).isFalse()
assertThat(security.keys.nonEmpty()).isTrue()
assertThat(config.cbs.nonEmpty()).isTrue()
@@ -127,21 +130,24 @@ internal object FileConfigurationReaderTest : Spek({
assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7))
assertThat(cbs.requestIntervalSec).isEqualTo(Some(900))
- assertThat(config.kafka.nonEmpty()).isTrue()
- val kafka = config.kafka.orNull() as PartialKafkaConfig
- assertThat(kafka.kafkaServers.nonEmpty()).isTrue()
- assertThat(kafka.routing.nonEmpty()).isTrue()
+ assertThat(config.collector.nonEmpty()).isTrue()
+ val collector = config.collector.orNull() as PartialCollectorConfig
+ collector.run {
+ assertThat(dummyMode).isEqualTo(Some(false))
+ assertThat(maxRequestSizeBytes).isEqualTo(Some(512000))
+ assertThat(kafkaServers.nonEmpty()).isTrue()
+ assertThat(routing.nonEmpty()).isTrue()
+ }
assertThat(config.server.nonEmpty()).isTrue()
val server = config.server.orNull() as PartialServerConfig
server.run {
- assertThat(dummyMode).isEqualTo(Some(false))
- assertThat(healthCheckApiPort).isEqualTo(Some(5000))
assertThat(idleTimeoutSec).isEqualTo(Some(1200))
assertThat(listenPort).isEqualTo(Some(6000))
- assertThat(maximumPayloadSizeBytes).isEqualTo(Some(512000))
+ assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000))
}
}
}
}
-}) \ No newline at end of file
+})
+
diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
index b64df05a..b49085e8 100644
--- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
+++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
@@ -1,16 +1,16 @@
{
- "server" : {
- "healthCheckApiPort" : 5000,
- "listenPort" : 6000,
- "idleTimeoutSec" : 1200,
- "maximumPayloadSizeBytes" : 512000,
- "dummyMode" : false
+ "logLevel": "ERROR",
+ "server": {
+ "healthCheckApiPort": 5000,
+ "listenPort": 6000,
+ "idleTimeoutSec": 1200,
+ "maxPayloadSizeBytes": 512000
},
- "cbs" : {
+ "cbs": {
"firstRequestDelaySec": 7,
"requestIntervalSec": 900
},
- "security" : {
+ "security": {
"sslDisable": false,
"keys": {
"keyStoreFile": "test.ks.pkcs12",
@@ -19,7 +19,9 @@
"trustStorePassword": "changeMeToo"
}
},
- "kafka" : {
+ "collector": {
+ "dummyMode": false,
+ "maxRequestSizeBytes": 512000,
"kafkaServers": [
"192.168.255.1:5005",
"192.168.255.1:5006"
@@ -30,6 +32,5 @@
"toTopic": "HV_VES_PERF3GPP"
}
]
- },
- "logLevel" : "ERROR"
+ }
} \ No newline at end of file
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 84310802..782d2324 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,13 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.utils.Closeable
import reactor.core.publisher.Flux
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
}
interface ConfigurationProvider {
- operator fun invoke(): Flux<CollectorConfiguration>
+ operator fun invoke(): Flux<Routing>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3ea14385..c08df748 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -20,13 +20,12 @@
package org.onap.dcae.collectors.veshv.factory
import arrow.core.Option
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val maximumPayloadSizeBytes: Int,
+ private val maxPayloadSizeBytes: Int,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config: AtomicReference<CollectorConfiguration> = AtomicReference()
+ val config = AtomicReference<Routing>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
+ router = Router(routing, ctx),
sink = sinkProvider(ctx),
metrics = metrics)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index 58a8599a..6c4e4671 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
/**
@@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
* @since May 2018
*/
object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+
+ private val sslFactory = SslContextFactory()
+
+ fun createNettyTcpServer(serverConfig: ServerConfiguration,
+ securityConfig: SecurityConfiguration,
collectorProvider: CollectorProvider,
- metrics: Metrics): Server =
- NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics)
+ metrics: Metrics
+ ): Server = NettyTcpServer(
+ serverConfig,
+ sslFactory.createServerContext(securityConfig),
+ collectorProvider,
+ metrics
+ )
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index a853839a..c362020e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(dummyMode: Boolean,
- kafkaConfig: KafkaConfiguration): SinkProvider =
- if (dummyMode)
+ fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
+ if (config.dummyMode)
LoggingSinkProvider()
else
- KafkaSinkProvider(kafkaConfig)
+ KafkaSinkProvider(config)
- fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
ConfigurationProviderImpl(
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- configurationProviderParams)
+ config)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index 51b6d4f0..754a2efc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
@@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
retrySpec: Retry<Any>
) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
cbsClientMono,
params.firstRequestDelay,
params.requestInterval,
@@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<CollectorConfiguration> =
+ override fun invoke(): Flux<Routing> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
.updates(RequestDiagnosticContext.create(),
firstRequestDelay,
requestInterval)
@@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ private fun createCollectorConfiguration(configuration: JsonObject): Routing =
try {
val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
} catch (e: NullPointerException) {
throw ParsingException("Failed to parse configuration", e)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index f52890b0..96e45a02 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -46,7 +46,7 @@ import java.lang.Integer.max
internal class KafkaSinkProvider internal constructor(
private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
- constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+ constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
@@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor(
private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
- private fun constructKafkaSender(config: KafkaConfiguration) =
+
+ private fun constructKafkaSender(config: CollectorConfiguration) =
KafkaSender.create(constructSenderOptions(config))
- private fun constructSenderOptions(config: KafkaConfiguration) =
+ private fun constructSenderOptions(config: CollectorConfiguration) =
SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
- .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config))
- .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config))
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
+ .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
+ .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
.producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
.producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
.producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
@@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor(
.producerProperty(ACKS_CONFIG, "1")
.stopOnError(false)
- private fun maxRequestSize(config: KafkaConfiguration) =
- (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt()
+ private fun maxRequestSize(maxRequestSizeBytes: Int) =
+ (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
- private fun bufferMemory(config: KafkaConfiguration) =
- max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes)
+ private fun bufferMemory(maxRequestSizeBytes: Int) =
+ max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 123956ad..fab96560 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.Option
import arrow.core.getOrElse
import arrow.effects.IO
+import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -41,6 +42,7 @@ import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.net.InetAddress
+import java.net.InetSocketAddress
import java.time.Duration
@@ -48,14 +50,14 @@ import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
- private val sslContextFactory: SslContextFactory,
+internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
+ private val sslContext: Option<SslContext>,
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
override fun start(): IO<ServerHandle> = IO {
TcpServer.create()
- .addressSupplier { serverConfig.serverListenAddress }
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
.configureSsl()
.handle(this::handleConnection)
.doOnUnbound {
@@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
}
private fun TcpServer.configureSsl() =
- sslContextFactory
- .createServerContext(serverConfig.securityConfiguration)
- .map { sslContext ->
+ sslContext
+ .map { serverContext ->
logger.info { "Collector configured with SSL enabled" }
- this.secure { b -> b.sslContext(sslContext) }
+ this.secure { it.sslContext(serverContext) }
}.getOrElse {
logger.info { "Collector configured with SSL disabled" }
this
@@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
withConnectionFrom(nettyInbound) { connection ->
connection
- .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
.logConnectionClosed(clientContext)
}.run {
collector.handleConnection(nettyInbound.createDataStream())
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index 21aaa129..f830f2c9 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val route1 = it.routing.routes[0]
+ val route1 = it.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
.isEqualTo(route1.domain)
@@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({
.describedAs("target topic 1")
.isEqualTo(route1.targetTopic)
- val route2 = it.routing.routes[1]
+ val route2 = it.routes[1]
assertThat(HEARTBEAT.domainName)
.describedAs("routed domain 2")
.isEqualTo(route2.domain)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index 068476ad..1e3f2e7a 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.ves.VesEventOuterClass
import reactor.kafka.sender.KafkaSender
@@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender
internal object KafkaSinkProviderTest : Spek({
describe("non functional requirements") {
given("sample configuration") {
- val config = KafkaConfiguration("localhost:9090",
- 1024 * 1024)
+ val config = CollectorConfiguration(
+ dummyMode = false,
+ maxRequestSizeBytes = 1024 * 1024,
+ kafkaServers = "localhost:9090",
+ routing = routing { }.build())
+
val cut = KafkaSinkProvider(config)
on("sample clients") {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index aaa3ee3b..bd056d4d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+ val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index dc5fe60b..ece42285 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index d97541ba..e84e9486 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
+import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()): AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index ed46b119..17f6ce32 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
-
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
import reactor.core.publisher.Flux
import java.time.Duration
@@ -149,7 +152,7 @@ object VesHvSpecification : Spek({
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -210,12 +213,12 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
@@ -317,7 +320,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -346,6 +349,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index c7e12bbd..1ad2b0e3 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,12 +20,11 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
@@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
-
-val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
+val basicRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
-val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
+val twoDomainsToOneTopicRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(HEARTBEAT.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(MEASUREMENT.domainName)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
+
+
+val configurationWithDifferentRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(ALTERNATE_PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
+
+val emptyRouting = routing { }.build()
-val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- }.build()
-)
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
- fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+ fun updateConfiguration(routing: Routing) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(collectorConfiguration)
+ configStream.onNext(routing)
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index dc9be16b..f6d1eab7 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -20,9 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
@@ -34,6 +31,7 @@ import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.MAXIMUM_PAYL
import org.onap.dcae.collectors.veshv.commandline.intValue
import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import java.net.InetSocketAddress
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
@@ -45,7 +43,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
)
override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
- Option.monad().binding {
+ binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
@@ -62,5 +60,5 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
maxPayloadSizeBytes,
kafkaBootstrapServers,
kafkaTopics)
- }.fix()
+ }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 39fcae21..c8a3c013 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,58 +19,58 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.neverComplete
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.scheduler.Schedulers
+import java.util.concurrent.atomic.AtomicReference
-private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
-private val logger = Logger("$VESHV_PACKAGE.main")
-private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
-fun main(args: Array<String>) =
- ConfigurationModule()
- .createConfigurationFromCommandLine(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
- )
+private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
+private val logger = Logger("$VES_HV_PACKAGE.main")
-private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
+private val hvVesServer = AtomicReference<ServerHandle>()
- val healthCheckServerHandle = HealthCheckServer.start(config).bind()
- val hvVesHandle = VesServer.start(config).bind()
+fun main(args: Array<String>) {
+ HealthCheckServer.start()
+ ConfigurationModule()
+ .hvVesConfigurationUpdates(args)
+ .publishOn(Schedulers.single(Schedulers.elastic()))
+ .doOnNext(::startServer)
+ .doOnError(::logServerStartFailed)
+ .neverComplete() // TODO: remove after merging configuration stream with cbs
+ .block()
+}
+
+private fun startServer(config: HvVesConfiguration) {
+ stopRunningServer()
+ Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+
+ VesServer.start(config).let {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+}
- registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
- hvVesHandle.await().bind()
- }.fix()
+private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
-internal fun closeServers(vararg handles: ServerHandle,
- healthState: HealthState = HealthState.INSTANCE) = {
+internal fun shutdownGracefully(serverHandle: ServerHandle,
+ healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+ serverHandle.close().unsafeRunSync()
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
+
+private fun logServerStartFailed(ex: Throwable) =
+ logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
index 15472b5e..bc284d08 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -19,25 +19,38 @@
*/
package org.onap.dcae.collectors.veshv.main.servers
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-object HealthCheckServer : ServerStarter() {
- override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+object HealthCheckServer {
- private fun createHealthCheckServer(config: ServerConfiguration) =
+ private const val DEFAULT_HEALTHCHECK_PORT = 6060
+ private val logger = Logger(HealthCheckServer::class)
+
+ fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) =
+ createHealthCheckServer(port)
+ .start()
+ .then(::logServerStarted)
+ .unsafeRunSync()
+
+ private fun createHealthCheckServer(listenPort: Int) =
HealthCheckApiServer(
HealthState.INSTANCE,
MicrometerMetrics.INSTANCE.metricsProvider,
- config.healthCheckApiListenAddress)
+ InetSocketAddress(listenPort))
- override fun serverStartedMessage(handle: ServerHandle) =
- "Health check server is up and listening on ${handle.host}:${handle.port}"
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info(ServiceContext::mdc) {
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+ }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
deleted file mode 100644
index 74a66324..00000000
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main.servers
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-abstract class ServerStarter {
- fun start(config: ServerConfiguration): IO<ServerHandle> =
- startServer(config)
- .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
-
- protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
- protected abstract fun serverStartedMessage(handle: ServerHandle): String
-
- companion object {
- private val logger = Logger(ServerStarter::class)
- }
-}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index 0f5e45ec..d15dccef 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -19,33 +19,54 @@
*/
package org.onap.dcae.collectors.veshv.main.servers
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-object VesServer : ServerStarter() {
- override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
-
- private fun createVesServer(config: ServerConfiguration): Server {
- val collectorProvider = CollectorFactory(
- AdapterFactory.configurationProvider(config.configurationProviderParams),
- AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
- MicrometerMetrics.INSTANCE,
- config.maximumPayloadSizeBytes
- ).createVesHvCollectorProvider()
-
- return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
- }
-
- override fun serverStartedMessage(handle: ServerHandle) =
- "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+object VesServer {
+
+ private val logger = Logger(VesServer::class)
+
+ fun start(config: HvVesConfiguration): ServerHandle =
+ createVesServer(config)
+ .start()
+ .then(::logServerStarted)
+ .unsafeRunSync()
+
+ private fun createVesServer(config: HvVesConfiguration): Server =
+ initializeCollectorFactory(config)
+ .createVesHvCollectorProvider()
+ .let { collectorProvider ->
+ ServerFactory.createNettyTcpServer(
+ config.server,
+ config.security,
+ collectorProvider,
+ MicrometerMetrics.INSTANCE
+ )
+ }
+
+ private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
+ CollectorFactory(
+ AdapterFactory.configurationProvider(config.cbs),
+ AdapterFactory.sinkCreatorFactory(config.collector),
+ MicrometerMetrics.INSTANCE,
+ config.server.maxPayloadSizeBytes
+ )
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info(ServiceContext::mdc) {
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+ }
+
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index e18b0b10..d8de9f25 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -42,7 +42,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle: ServerHandle = mock()
+ val handle = mock<ServerHandle>()
var closed = false
val handleClose = IO {
closed = true
@@ -50,8 +50,8 @@ internal object MainTest : Spek({
whenever(handle.close()).thenReturn(handleClose)
val healthState: HealthState = mock()
- on("closeServers") {
- closeServers(handle, healthState = healthState).invoke()
+ on("shutdownGracefully") {
+ shutdownGracefully(handle, healthState = healthState)
it("should close all handles") {
assertThat(closed).isTrue()
diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt
index fb211115..579eb84c 100644
--- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt
+++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SecurityConfiguration.kt
@@ -21,11 +21,9 @@ package org.onap.dcae.collectors.veshv.ssl.boundary
import arrow.core.Option
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.nio.file.Path
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class SecurityConfiguration(
- val keys: Option<SecurityKeys>)
+data class SecurityConfiguration(val keys: Option<SecurityKeys>)
diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt
index 805d94d2..f72ddecb 100644
--- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt
+++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/SslContextFactory.kt
@@ -30,7 +30,7 @@ import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory
class SslContextFactory(private val sslFactory: SslFactory = SslFactory()) {
fun createServerContext(secConfig: SecurityConfiguration): Option<SslContext> =
secConfig.keys.map { sslFactory.createSecureServerContext(it) }
+
fun createClientContext(secConfig: SecurityConfiguration): Option<SslContext> =
secConfig.keys.map { sslFactory.createSecureClientContext(it) }
-
}
diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt
index f8632350..822d84f1 100644
--- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt
+++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt
@@ -51,7 +51,7 @@ fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> Securit
private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE)
-private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None)
+private fun disabledSecurityConfiguration() = SecurityConfiguration(None)
private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE)
@@ -66,8 +66,7 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfigur
.trustStorePassword(Passwords.fromString(tsPass))
.build()
- return SecurityConfiguration(keys = Some(keys))
+ return SecurityConfiguration(Some(keys))
}
-
private fun pathFromFile(file: String) = Paths.get(file)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt
index 847279d9..24fedbbc 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/ConfigurationProviderParams.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/resources.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,13 +17,12 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.config.api.model
+package org.onap.dcae.collectors.veshv.tests.utils
-import java.time.Duration
+import java.io.InputStreamReader
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-data class ConfigurationProviderParams(val firstRequestDelay: Duration,
- val requestInterval: Duration)
+fun <T> Class<T>.resourceAsStream(resourcePath: String): InputStreamReader =
+ InputStreamReader(getResourceAsStream(resourcePath))
+
+fun <T> Class<T>.absoluteResourcePath(resourcePath: String): String =
+ getResource(resourcePath).path
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index bedc2fcd..d5b33b91 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -20,10 +20,20 @@
package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
+import arrow.core.ForOption
import arrow.core.Option
import arrow.core.Try
+import arrow.core.fix
import arrow.core.identity
+import arrow.effects.ForIO
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.instances.option.monad.monad
import arrow.syntax.collections.firstOption
+import arrow.typeclasses.MonadContinuation
+import arrow.typeclasses.binding
+import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicReference
/**
@@ -31,12 +41,24 @@ import java.util.concurrent.atomic.AtomicReference
* @since July 2018
*/
+object OptionUtils {
+ fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A)
+ : Option<A> = Option.monad().binding(c).fix()
+}
+
+object IOUtils {
+ fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A)
+ : IO<A> = IO.monad().binding(c).fix()
+}
+
fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
+
fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
@@ -57,3 +79,13 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply {
action(exception)
}
}
+
+fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B)
+ : Option<B> = let { OptionUtils.binding { c(it) } }
+
+
+
+
+
+
+
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
index f9dff203..aaa598d2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/KafkaConfiguration.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,10 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.config.api.model
+package org.onap.dcae.collectors.veshv.utils
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-data class KafkaConfiguration(val bootstrapServers: String, val maximalRequestSizeBytes: Int)
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then() \ No newline at end of file
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
index 87aea41e..cc940907 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/shutdown_hook.kt
@@ -19,10 +19,19 @@
*/
package org.onap.dcae.collectors.veshv.utils
+import java.util.concurrent.atomic.AtomicReference
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since January 2019
*/
-fun registerShutdownHook(job: () -> Unit) =
- Runtime.getRuntime()
- .addShutdownHook(Thread({ job() }, "GracefulShutdownThread"))
+
+private val currentShutdownHook = AtomicReference<Thread>()
+
+fun registerShutdownHook(job: () -> Unit) {
+ val runtime = Runtime.getRuntime()
+ val newShutdownHook = Thread({ job() }, "GracefulShutdownThread")
+ currentShutdownHook.get()?.run(runtime::removeShutdownHook)
+ currentShutdownHook.set(newShutdownHook)
+ runtime.addShutdownHook(newShutdownHook)
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 08c05e05..28cc0556 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -20,9 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import arrow.core.Option
-import arrow.core.fix
-import arrow.instances.option.monad.monad
-import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
@@ -40,6 +37,7 @@ import org.onap.dcae.collectors.veshv.commandline.intValue
import org.onap.dcae.collectors.veshv.commandline.stringValue
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.net.InetSocketAddress
@@ -62,7 +60,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
TRUST_STORE_PASSWORD)
override fun getConfiguration(cmdLine: CommandLine): Option<SimulatorConfiguration> =
- Option.monad().binding {
+ binding {
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
@@ -86,7 +84,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
InetSocketAddress(vesHost, vesPort),
maxPayloadSizeBytes,
security)
- }.fix()
+ }
internal object DefaultValues {
const val HEALTH_CHECK_API_PORT = 6063
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index a73b39b1..a1042f3e 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -20,9 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf
import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.typeclasses.binding
import io.vavr.collection.HashSet
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
@@ -36,6 +33,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfigura
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -62,7 +60,7 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
)
private fun startServers(config: SimulatorConfiguration): IO<Unit> =
- IO.monad().binding {
+ binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
@@ -79,5 +77,5 @@ private fun startServers(config: SimulatorConfiguration): IO<Unit> =
HealthState.INSTANCE.changeState(HealthDescription.IDLE)
xnfApiServerHandler.await().bind()
- }.fix()
+ }