aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-configuration/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-configuration/src')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt3
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt57
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt97
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt37
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt14
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt8
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt48
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt)12
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt43
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt102
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt46
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt61
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt77
-rw-r--r--sources/hv-collector-configuration/src/test/resources/sampleConfig.json16
15 files changed, 267 insertions, 358 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 9f8c552b..efe0aa88 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -34,10 +34,14 @@ class ConfigurationModule {
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
+ private lateinit var initialConfig: HvVesConfiguration
+
fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
Flux.just(cmd.parse(args))
.throwOnLeft { MissingArgumentException(it.message, it.cause) }
.map { it.reader().use(configReader::loadConfig) }
.map { configValidator.validate(it) }
.throwOnLeft { ValidationException(it.message) }
+ .doOnNext { initialConfig = it }
+
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
index 566f2c08..3375821e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
@@ -49,6 +49,5 @@ data class CbsConfiguration(
data class CollectorConfiguration(
val maxRequestSizeBytes: Int,
val kafkaServers: String,
- val routing: Routing,
- val dummyMode: Boolean = false
+ val routing: Routing
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index aab8ecad..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -19,59 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-data class Routing(val routes: List<Route>) {
+data class Route(val domain: String, val sink: KafkaSink)
- fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
-
- fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
- operator fun invoke(message: VesMessage): RoutedMessage =
- RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-HvVesConfiguration DSL
-*/
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
-
-class RoutingBuilder {
- private val routes: MutableList<RouteBuilder> = mutableListOf()
-
- fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
- .apply(init)
- .also { routes.add(it) }
-
- fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
- private lateinit var domain: String
- private lateinit var targetTopic: String
- private lateinit var partitioning: (CommonEventHeader) -> Int
-
- fun fromDomain(domain: String): RouteBuilder = apply {
- this.domain = domain
- }
-
- fun toTopic(targetTopic: String): RouteBuilder = apply {
- this.targetTopic = targetTopic
- }
-
- fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
- partitioning = { num }
- }
-
- fun build() = Route(domain, targetTopic, partitioning)
-}
+typealias Routing = List<Route>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
new file mode 100644
index 00000000..f044492c
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -0,0 +1,97 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since March 2019
+ */
+internal class ConfigurationMerger {
+ fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration =
+ PartialConfiguration(
+ mergeServerConfig(base.server, update.server),
+ mergeCbsConfig(base.cbs, update.cbs),
+ mergeSecurityConfig(base.security, update.security),
+ mergeCollectorConfig(base.collector, update.collector),
+ mergeLogLevel(base.logLevel, update.logLevel)
+ )
+
+
+ private fun mergeServerConfig(baseOption: Option<PartialServerConfig>,
+ updateOption: Option<PartialServerConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialServerConfig(
+ base.listenPort.updateToGivenOrNone(update.listenPort),
+ base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
+ base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes)
+ )
+ }
+
+
+ private fun mergeCbsConfig(baseOption: Option<PartialCbsConfig>,
+ updateOption: Option<PartialCbsConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialCbsConfig(
+ base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
+ base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec)
+ )
+ }
+
+ private fun mergeSecurityConfig(baseOption: Option<PartialSecurityConfig>,
+ updateOption: Option<PartialSecurityConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialSecurityConfig(
+ base.keys.updateToGivenOrNone(update.keys)
+ )
+ }
+
+ private fun mergeCollectorConfig(baseOption: Option<PartialCollectorConfig>,
+ updateOption: Option<PartialCollectorConfig>) =
+ applyUpdate(baseOption, updateOption) { base, update ->
+ PartialCollectorConfig(
+ base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes),
+ base.kafkaServers.updateToGivenOrNone(update.kafkaServers),
+ base.routing.updateToGivenOrNone(update.routing)
+ )
+ }
+
+
+ private fun mergeLogLevel(base: Option<LogLevel>, update: Option<LogLevel>) =
+ base.updateToGivenOrNone(update)
+}
+
+private fun <T> applyUpdate(base: Option<T>, update: Option<T>, overrider: (base: T, update: T) -> T) =
+ when {
+ base is Some && update is Some -> overrider(base.t, update.t).toOption()
+ base is Some && update is None -> base
+ base is None && update is Some -> update
+ else -> None
+ }
+
+private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
+ update.getOrElse(this::orNull).toOption()
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 90be3dbd..04bba7e2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Either
import arrow.core.None
import arrow.core.Option
-import arrow.core.Some
import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
@@ -54,14 +53,20 @@ internal class ConfigurationValidator {
val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
- val collectorConfiguration = partialConfig.collector.bind()
- .let { createCollectorConfig(it).bind() }
+// TOD0: retrieve when ConfigurationMerger is implemented
+// val collectorConfiguration = partialConfig.collector.bind()
+// .let { createCollectorConfig(it).bind() }
HvVesConfiguration(
serverConfiguration,
cbsConfiguration,
securityConfiguration,
- collectorConfiguration,
+// TOD0: swap when ConfigurationMerger is implemented
+// collectorConfiguration
+ CollectorConfiguration(-1,
+ "I do not exist. I'm not even a URL :o",
+ emptyList()),
+// end TOD0
logLevel
)
}.toEither { ValidationError("Some required configuration options are missing") }
@@ -79,7 +84,7 @@ internal class ConfigurationValidator {
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()),
+ it.idleTimeoutSec.bind(),
it.maxPayloadSizeBytes.bind()
)
}
@@ -87,20 +92,20 @@ internal class ConfigurationValidator {
private fun createCbsConfiguration(partial: PartialCbsConfig) =
partial.mapBinding {
CbsConfiguration(
- Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()),
- Duration.ofSeconds(it.requestIntervalSec.bind().toLong())
+ it.firstRequestDelaySec.bind(),
+ it.requestIntervalSec.bind()
)
}
- private fun createCollectorConfig(partial: PartialCollectorConfig) =
- partial.mapBinding {
- CollectorConfiguration(
- it.maxRequestSizeBytes.bind(),
- toKafkaServersString(it.kafkaServers.bind()),
- it.routing.bind(),
- it.dummyMode.bind()
- )
- }
+// TOD0: retrieve when ConfigurationMerger is implemented
+// private fun createCollectorConfig(partial: PartialCollectorConfig) =
+// partial.mapBinding {
+// CollectorConfiguration(
+// it.maxRequestSizeBytes.bind(),
+// toKafkaServersString(it.kafkaServers.bind()),
+// it.routing.bind()
+// )
+// }
private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
index 1e77dde5..9513107b 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
@@ -21,16 +21,12 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.Option
import com.google.gson.GsonBuilder
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter
+import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RouteAdapter
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RoutingAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+
import java.io.Reader
-import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -38,11 +34,9 @@ import java.net.InetSocketAddress
*/
internal class FileConfigurationReader {
private val gson = GsonBuilder()
- .registerTypeAdapter(InetSocketAddress::class.java, AddressAdapter())
- .registerTypeAdapter(Route::class.java, RouteAdapter())
- .registerTypeAdapter(Routing::class.java, RoutingAdapter())
.registerTypeAdapter(Option::class.java, OptionAdapter())
.registerTypeAdapter(PartialSecurityConfig::class.java, SecurityAdapter())
+ .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
.create()
fun loadConfig(input: Reader): PartialConfiguration =
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
index 3e6df3e0..a27998e1 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -40,19 +41,18 @@ internal data class PartialConfiguration(
internal data class PartialServerConfig(
val listenPort: Option<Int> = None,
- val idleTimeoutSec: Option<Int> = None,
+ val idleTimeoutSec: Option<Duration> = None,
val maxPayloadSizeBytes: Option<Int> = None
)
internal data class PartialCbsConfig(
- val firstRequestDelaySec: Option<Int> = None,
- val requestIntervalSec: Option<Int> = None
+ val firstRequestDelaySec: Option<Duration> = None,
+ val requestIntervalSec: Option<Duration> = None
)
internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
- val dummyMode: Option<Boolean> = None,
val maxRequestSizeBytes: Option<Int> = None,
val kafkaServers: Option<List<InetSocketAddress>> = None,
val routing: Option<Routing> = None
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt
deleted file mode 100644
index 255be03a..00000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import com.google.gson.JsonParseException
-import java.lang.reflect.Type
-import java.net.InetSocketAddress
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since February 2019
- */
-internal class AddressAdapter : JsonDeserializer<InetSocketAddress> {
- override fun deserialize(
- json: JsonElement,
- typeOfT: Type,
- context: JsonDeserializationContext): InetSocketAddress {
- val portStart = json.asString.lastIndexOf(":")
- if (portStart > 0) {
- val address = json.asString.substring(0, portStart)
- val port = json.asString.substring(portStart + 1)
- return InetSocketAddress(address, port.toInt())
- } else throw InvalidAddressException("Cannot parse '" + json.asString + "' to address")
- }
-
- class InvalidAddressException(reason: String) : RuntimeException(reason)
-}
-
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
index 4b299098..99da1102 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
@@ -22,19 +22,15 @@ package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
import com.google.gson.JsonDeserializationContext
import com.google.gson.JsonDeserializer
import com.google.gson.JsonElement
-import com.google.gson.reflect.TypeToken
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import java.lang.reflect.Type
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
* @since March 2019
*/
-internal class RoutingAdapter : JsonDeserializer<Routing> {
- override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext): Routing {
- val parametrizedType = TypeToken.getParameterized(List::class.java, Route::class.java).type
- return Routing(context.deserialize<List<Route>>(json, parametrizedType))
- }
+class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
+ override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
+ Duration.ofSeconds(json.asLong)
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt
deleted file mode 100644
index 25cb8861..00000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.RouteBuilder
-import java.lang.reflect.Type
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since March 2019
- */
-internal class RouteAdapter : JsonDeserializer<Route> {
- override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext?): Route {
- val jobj = json.asJsonObject
- return RouteBuilder()
- .fromDomain(jobj["fromDomain"].asString)
- .toTopic(jobj["toTopic"].asString)
- .withFixedPartitioning()
- .build()
- }
-
-}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
new file mode 100644
index 00000000..d5b18e68
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Some
+import org.jetbrains.spek.api.Spek
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import java.io.InputStreamReader
+import java.io.Reader
+import java.time.Duration
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since February 2019
+ */
+internal object ConfigurationMergerTest : Spek({
+ describe("Merges partial configurations into one") {
+ it("merges single parameter into empty config") {
+ val actual = PartialConfiguration()
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ }
+
+ it("merges single embedded parameter into empty config") {
+ val actual = PartialConfiguration()
+ val serverConfig = PartialServerConfig(listenPort = Some(45))
+ val diff = PartialConfiguration(server = Some(serverConfig))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.server).isEqualTo(Some(serverConfig))
+ }
+
+ it("merges single parameter into full config") {
+ val actual = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ }
+
+ it("merges single embedded parameter into full config") {
+ val actual = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+ val serverConfig = PartialServerConfig(listenPort = Some(45))
+ val diff = PartialConfiguration(server = Some(serverConfig))
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.server.orNull()?.listenPort).isEqualTo(serverConfig.listenPort)
+ assertThat(result.server.orNull()?.idleTimeoutSec?.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes?.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+ }
+
+ it("merges full config into single parameter") {
+ val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+ val diff = FileConfigurationReader().loadConfig(
+ InputStreamReader(
+ FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+
+ val result = ConfigurationMerger().merge(actual, diff)
+
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
+ assertThat(result.server.isEmpty()).isFalse()
+ assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+ assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+
+ assertThat(result.security.isEmpty()).isFalse()
+ assertThat(result.cbs.isEmpty()).isFalse()
+ }
+ }
+})
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index beb5df61..4b89488b 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -28,7 +28,7 @@ import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
@@ -53,21 +53,20 @@ internal object ConfigurationValidatorTest : Spek({
val config = PartialConfiguration(
Some(PartialServerConfig(
Some(1),
- Some(2),
+ Some(Duration.ofSeconds(2)),
Some(3)
)),
Some(PartialCbsConfig(
- Some(5),
- Some(3)
+ Some(Duration.ofSeconds(5)),
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
Some(mock())
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
- Some(routing { }.build())
+ someFromEmptyRouting
)),
None
)
@@ -86,10 +85,9 @@ internal object ConfigurationValidatorTest : Spek({
}
describe("validating complete configuration") {
- val idleTimeoutSec = 10
- val firstReqDelaySec = 10
+ val idleTimeoutSec = Duration.ofSeconds(10L)
+ val firstReqDelaySec = Duration.ofSeconds(10L)
val securityKeys = Some(mock<SecurityKeys>())
- val routing = routing { }.build()
val config = PartialConfiguration(
Some(PartialServerConfig(
@@ -99,16 +97,15 @@ internal object ConfigurationValidatorTest : Spek({
)),
Some(PartialCbsConfig(
Some(firstReqDelaySec),
- Some(3)
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
securityKeys
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
- Some(routing)
+ someFromEmptyRouting
)),
Some(LogLevel.INFO)
)
@@ -121,26 +118,25 @@ internal object ConfigurationValidatorTest : Spek({
},
{
assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+ .isEqualTo(idleTimeoutSec)
assertThat(it.security.keys)
.isEqualTo(securityKeys)
assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+ .isEqualTo(firstReqDelaySec)
assertThat(it.collector.routing)
- .isEqualTo(routing)
+ .isEqualTo(emptyRouting)
}
)
}
}
describe("validating configuration with security disabled") {
- val idleTimeoutSec = 10
- val firstReqDelaySec = 10
+ val idleTimeoutSec = Duration.ofSeconds(10)
+ val firstReqDelaySec = Duration.ofSeconds(10)
val securityKeys: Option<SecurityKeys> = None
- val routing = routing { }.build()
val config = PartialConfiguration(
Some(PartialServerConfig(
@@ -150,16 +146,15 @@ internal object ConfigurationValidatorTest : Spek({
)),
Some(PartialCbsConfig(
Some(firstReqDelaySec),
- Some(3)
+ Some(Duration.ofSeconds(3))
)),
Some(PartialSecurityConfig(
securityKeys
)),
Some(PartialCollectorConfig(
- Some(true),
Some(4),
Some(emptyList()),
- Some(routing)
+ someFromEmptyRouting
)),
Some(LogLevel.INFO)
)
@@ -172,16 +167,16 @@ internal object ConfigurationValidatorTest : Spek({
},
{
assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+ .isEqualTo(idleTimeoutSec)
assertThat(it.security.keys)
.isEqualTo(securityKeys)
assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+ .isEqualTo(firstReqDelaySec)
assertThat(it.collector.routing)
- .isEqualTo(routing)
+ .isEqualTo(emptyRouting)
}
)
}
@@ -189,3 +184,6 @@ internal object ConfigurationValidatorTest : Spek({
}
})
+
+val emptyRouting: Routing = emptyList()
+val someFromEmptyRouting = Some(emptyRouting)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
index 8267e304..4e35bfb3 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
@@ -24,11 +24,11 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import java.io.StringReader
import java.net.InetSocketAddress
+import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -59,50 +59,6 @@ internal object FileConfigurationReaderTest : Spek({
assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003))
}
- it("parses ip address") {
- val input = """{ "collector" : {
- "kafkaServers": [
- "192.168.255.1:5005",
- "192.168.255.26:5006"
- ]
- }
- }"""
-
- val config = cut.loadConfig(StringReader(input))
- assertThat(config.collector.nonEmpty()).isTrue()
- val collector = config.collector.orNull() as PartialCollectorConfig
- assertThat(collector.kafkaServers.nonEmpty()).isTrue()
- val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress>
- assertThat(addresses)
- .isEqualTo(listOf(
- InetSocketAddress("192.168.255.1", 5005),
- InetSocketAddress("192.168.255.26", 5006)
- ))
- }
-
- it("parses routing array with RoutingAdapter") {
- val input = """{
- "collector" : {
- "routing" : [
- {
- "fromDomain": "perf3gpp",
- "toTopic": "HV_VES_PERF3GPP"
- }
- ]
- }
- }""".trimIndent()
- val config = cut.loadConfig(StringReader(input))
- assertThat(config.collector.nonEmpty()).isTrue()
- val collector = config.collector.orNull() as PartialCollectorConfig
- assertThat(collector.routing.nonEmpty()).isTrue()
- val routing = collector.routing.orNull() as Routing
- routing.run {
- assertThat(routes.size).isEqualTo(1)
- assertThat(routes[0].domain).isEqualTo("perf3gpp")
- assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP")
- }
- }
-
it("parses disabled security configuration") {
val input = """{
"security": {
@@ -139,22 +95,13 @@ internal object FileConfigurationReaderTest : Spek({
assertThat(config.cbs.nonEmpty()).isTrue()
val cbs = config.cbs.orNull() as PartialCbsConfig
- assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7))
- assertThat(cbs.requestIntervalSec).isEqualTo(Some(900))
-
- assertThat(config.collector.nonEmpty()).isTrue()
- val collector = config.collector.orNull() as PartialCollectorConfig
- collector.run {
- assertThat(dummyMode).isEqualTo(Some(false))
- assertThat(maxRequestSizeBytes).isEqualTo(Some(512000))
- assertThat(kafkaServers.nonEmpty()).isTrue()
- assertThat(routing.nonEmpty()).isTrue()
- }
+ assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7)))
+ assertThat(cbs.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900)))
assertThat(config.server.nonEmpty()).isTrue()
val server = config.server.orNull() as PartialServerConfig
server.run {
- assertThat(idleTimeoutSec).isEqualTo(Some(1200))
+ assertThat(idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
assertThat(listenPort).isEqualTo(Some(6000))
assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000))
}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt
deleted file mode 100644
index f70c4337..00000000
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.Gson
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonParseException
-import com.google.gson.reflect.TypeToken
-import com.nhaarman.mockitokotlin2.mock
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter.InvalidAddressException
-import java.lang.NumberFormatException
-import kotlin.test.assertFailsWith
-
-
-internal object AddressAdapterTest : Spek({
-
- describe("deserialization") {
- val gson = Gson()
- val context = mock<JsonDeserializationContext>()
- val addressAdapterType = TypeToken.get(AddressAdapter::class.java).type
-
- val cut = AddressAdapter()
-
- given("valid string") {
- val address = "hostname:9000"
- val json = gson.toJsonTree(address)
-
- it("should return address") {
- val deserialized = cut.deserialize(json, addressAdapterType, context)
-
- assertThat(deserialized.hostName).isEqualTo("hostname")
- assertThat(deserialized.port).isEqualTo(9000)
- }
- }
-
- val invalidAddresses = mapOf(
- Pair("missingPort", InvalidAddressException::class),
- Pair("NaNPort:Hey", NumberFormatException::class),
- Pair(":6036", InvalidAddressException::class))
-
- invalidAddresses.forEach { address, exception ->
- given("invalid address string: $address") {
-
- val json = gson.toJsonTree(address)
- it("should throw exception") {
- assertFailsWith(exception) {
- cut.deserialize(json, addressAdapterType, context)
- }
- }
- }
- }
- }
-})
-
-
diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
index b49085e8..07f0702f 100644
--- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
+++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
@@ -18,19 +18,5 @@
"trustStoreFile": "trust.ks.pkcs12",
"trustStorePassword": "changeMeToo"
}
- },
- "collector": {
- "dummyMode": false,
- "maxRequestSizeBytes": 512000,
- "kafkaServers": [
- "192.168.255.1:5005",
- "192.168.255.1:5006"
- ],
- "routing": [
- {
- "fromDomain": "perf3gpp",
- "toTopic": "HV_VES_PERF3GPP"
- }
- ]
}
-} \ No newline at end of file
+}