diff options
42 files changed, 771 insertions, 891 deletions
diff --git a/development/configuration/base.json b/development/configuration/base.json index 5233f02c..c89a8288 100644 --- a/development/configuration/base.json +++ b/development/configuration/base.json @@ -16,18 +16,5 @@ "trustStoreFile": "/etc/ves-hv/ssl/trust.p12", "trustStorePassword": "onaponap" } - }, - "collector": { - "dummyMode": false, - "maxRequestSizeBytes": 1048576, - "kafkaServers": [ - "message-router-kafka:9092" - ], - "routing": [ - { - "fromDomain": "perf3gpp", - "toTopic": "HV_VES_PERF3GPP" - } - ] } }
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index 9f8c552b..efe0aa88 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -34,10 +34,14 @@ class ConfigurationModule { private val configReader = FileConfigurationReader() private val configValidator = ConfigurationValidator() + private lateinit var initialConfig: HvVesConfiguration + fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> = Flux.just(cmd.parse(args)) .throwOnLeft { MissingArgumentException(it.message, it.cause) } .map { it.reader().use(configReader::loadConfig) } .map { configValidator.validate(it) } .throwOnLeft { ValidationException(it.message) } + .doOnNext { initialConfig = it } + } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt index 566f2c08..3375821e 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt @@ -49,6 +49,5 @@ data class CbsConfiguration( data class CollectorConfiguration( val maxRequestSizeBytes: Int, val kafkaServers: String, - val routing: Routing, - val dummyMode: Boolean = false + val routing: Routing ) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt index aab8ecad..e5a83ac4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -19,59 +19,8 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -import arrow.core.Option -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -data class Routing(val routes: List<Route>) { +data class Route(val domain: String, val sink: KafkaSink) - fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) -} - -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) { - - fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain - - operator fun invoke(message: VesMessage): RoutedMessage = - RoutedMessage(targetTopic, partitioning(message.header), message) -} - - -/* -HvVesConfiguration DSL -*/ - -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init) - -class RoutingBuilder { - private val routes: MutableList<RouteBuilder> = mutableListOf() - - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder() - .apply(init) - .also { routes.add(it) } - - fun build() = Routing(routes.map { it.build() }.toList()) -} - -class RouteBuilder { - - private lateinit var domain: String - private lateinit var targetTopic: String - private lateinit var partitioning: (CommonEventHeader) -> Int - - fun fromDomain(domain: String): RouteBuilder = apply { - this.domain = domain - } - - fun toTopic(targetTopic: String): RouteBuilder = apply { - this.targetTopic = targetTopic - } - - fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply { - partitioning = { num } - } - - fun build() = Route(domain, targetTopic, partitioning) -} +typealias Routing = List<Route> diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt new file mode 100644 index 00000000..f044492c --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel + +/** + * @author Pawel Biniek <pawel.biniek@nokia.com> + * @since March 2019 + */ +internal class ConfigurationMerger { + fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration = + PartialConfiguration( + mergeServerConfig(base.server, update.server), + mergeCbsConfig(base.cbs, update.cbs), + mergeSecurityConfig(base.security, update.security), + mergeCollectorConfig(base.collector, update.collector), + mergeLogLevel(base.logLevel, update.logLevel) + ) + + + private fun mergeServerConfig(baseOption: Option<PartialServerConfig>, + updateOption: Option<PartialServerConfig>) = + applyUpdate(baseOption, updateOption) { base, update -> + PartialServerConfig( + base.listenPort.updateToGivenOrNone(update.listenPort), + base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), + base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes) + ) + } + + + private fun mergeCbsConfig(baseOption: Option<PartialCbsConfig>, + updateOption: Option<PartialCbsConfig>) = + applyUpdate(baseOption, updateOption) { base, update -> + PartialCbsConfig( + base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), + base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec) + ) + } + + private fun mergeSecurityConfig(baseOption: Option<PartialSecurityConfig>, + updateOption: Option<PartialSecurityConfig>) = + applyUpdate(baseOption, updateOption) { base, update -> + PartialSecurityConfig( + base.keys.updateToGivenOrNone(update.keys) + ) + } + + private fun mergeCollectorConfig(baseOption: Option<PartialCollectorConfig>, + updateOption: Option<PartialCollectorConfig>) = + applyUpdate(baseOption, updateOption) { base, update -> + PartialCollectorConfig( + base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes), + base.kafkaServers.updateToGivenOrNone(update.kafkaServers), + base.routing.updateToGivenOrNone(update.routing) + ) + } + + + private fun mergeLogLevel(base: Option<LogLevel>, update: Option<LogLevel>) = + base.updateToGivenOrNone(update) +} + +private fun <T> applyUpdate(base: Option<T>, update: Option<T>, overrider: (base: T, update: T) -> T) = + when { + base is Some && update is Some -> overrider(base.t, update.t).toOption() + base is Some && update is None -> base + base is None && update is Some -> update + else -> None + } + +private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) = + update.getOrElse(this::orNull).toOption() diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index 90be3dbd..04bba7e2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Either import arrow.core.None import arrow.core.Option -import arrow.core.Some import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration @@ -54,14 +53,20 @@ internal class ConfigurationValidator { val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys) - val collectorConfiguration = partialConfig.collector.bind() - .let { createCollectorConfig(it).bind() } +// TOD0: retrieve when ConfigurationMerger is implemented +// val collectorConfiguration = partialConfig.collector.bind() +// .let { createCollectorConfig(it).bind() } HvVesConfiguration( serverConfiguration, cbsConfiguration, securityConfiguration, - collectorConfiguration, +// TOD0: swap when ConfigurationMerger is implemented +// collectorConfiguration + CollectorConfiguration(-1, + "I do not exist. I'm not even a URL :o", + emptyList()), +// end TOD0 logLevel ) }.toEither { ValidationError("Some required configuration options are missing") } @@ -79,7 +84,7 @@ internal class ConfigurationValidator { partial.mapBinding { ServerConfiguration( it.listenPort.bind(), - Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()), + it.idleTimeoutSec.bind(), it.maxPayloadSizeBytes.bind() ) } @@ -87,20 +92,20 @@ internal class ConfigurationValidator { private fun createCbsConfiguration(partial: PartialCbsConfig) = partial.mapBinding { CbsConfiguration( - Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()), - Duration.ofSeconds(it.requestIntervalSec.bind().toLong()) + it.firstRequestDelaySec.bind(), + it.requestIntervalSec.bind() ) } - private fun createCollectorConfig(partial: PartialCollectorConfig) = - partial.mapBinding { - CollectorConfiguration( - it.maxRequestSizeBytes.bind(), - toKafkaServersString(it.kafkaServers.bind()), - it.routing.bind(), - it.dummyMode.bind() - ) - } +// TOD0: retrieve when ConfigurationMerger is implemented +// private fun createCollectorConfig(partial: PartialCollectorConfig) = +// partial.mapBinding { +// CollectorConfiguration( +// it.maxRequestSizeBytes.bind(), +// toKafkaServersString(it.kafkaServers.bind()), +// it.routing.bind() +// ) +// } private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String = kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt index 1e77dde5..9513107b 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt @@ -21,16 +21,12 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.Option import com.google.gson.GsonBuilder -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter +import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RouteAdapter -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RoutingAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys + import java.io.Reader -import java.net.InetSocketAddress +import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -38,11 +34,9 @@ import java.net.InetSocketAddress */ internal class FileConfigurationReader { private val gson = GsonBuilder() - .registerTypeAdapter(InetSocketAddress::class.java, AddressAdapter()) - .registerTypeAdapter(Route::class.java, RouteAdapter()) - .registerTypeAdapter(Routing::class.java, RoutingAdapter()) .registerTypeAdapter(Option::class.java, OptionAdapter()) .registerTypeAdapter(PartialSecurityConfig::class.java, SecurityAdapter()) + .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter()) .create() fun loadConfig(input: Reader): PartialConfiguration = diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt index 3e6df3e0..a27998e1 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys import java.net.InetSocketAddress +import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -40,19 +41,18 @@ internal data class PartialConfiguration( internal data class PartialServerConfig( val listenPort: Option<Int> = None, - val idleTimeoutSec: Option<Int> = None, + val idleTimeoutSec: Option<Duration> = None, val maxPayloadSizeBytes: Option<Int> = None ) internal data class PartialCbsConfig( - val firstRequestDelaySec: Option<Int> = None, - val requestIntervalSec: Option<Int> = None + val firstRequestDelaySec: Option<Duration> = None, + val requestIntervalSec: Option<Duration> = None ) internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None) internal data class PartialCollectorConfig( - val dummyMode: Option<Boolean> = None, val maxRequestSizeBytes: Option<Int> = None, val kafkaServers: Option<List<InetSocketAddress>> = None, val routing: Option<Routing> = None diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt deleted file mode 100644 index 255be03a..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonDeserializer -import com.google.gson.JsonElement -import com.google.gson.JsonParseException -import java.lang.reflect.Type -import java.net.InetSocketAddress - -/** - * @author Pawel Biniek <pawel.biniek@nokia.com> - * @since February 2019 - */ -internal class AddressAdapter : JsonDeserializer<InetSocketAddress> { - override fun deserialize( - json: JsonElement, - typeOfT: Type, - context: JsonDeserializationContext): InetSocketAddress { - val portStart = json.asString.lastIndexOf(":") - if (portStart > 0) { - val address = json.asString.substring(0, portStart) - val port = json.asString.substring(portStart + 1) - return InetSocketAddress(address, port.toInt()) - } else throw InvalidAddressException("Cannot parse '" + json.asString + "' to address") - } - - class InvalidAddressException(reason: String) : RuntimeException(reason) -} - diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt index 4b299098..99da1102 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt @@ -22,19 +22,15 @@ package org.onap.dcae.collectors.veshv.config.impl.gsonadapters import com.google.gson.JsonDeserializationContext import com.google.gson.JsonDeserializer import com.google.gson.JsonElement -import com.google.gson.reflect.TypeToken -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing import java.lang.reflect.Type +import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> * @since March 2019 */ -internal class RoutingAdapter : JsonDeserializer<Routing> { - override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext): Routing { - val parametrizedType = TypeToken.getParameterized(List::class.java, Route::class.java).type - return Routing(context.deserialize<List<Route>>(json, parametrizedType)) - } +class DurationOfSecondsAdapter : JsonDeserializer<Duration> { + override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) = + Duration.ofSeconds(json.asLong) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt deleted file mode 100644 index 25cb8861..00000000 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonDeserializer -import com.google.gson.JsonElement -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.RouteBuilder -import java.lang.reflect.Type - -/** - * @author Pawel Biniek <pawel.biniek@nokia.com> - * @since March 2019 - */ -internal class RouteAdapter : JsonDeserializer<Route> { - override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext?): Route { - val jobj = json.asJsonObject - return RouteBuilder() - .fromDomain(jobj["fromDomain"].asString) - .toTopic(jobj["toTopic"].asString) - .withFixedPartitioning() - .build() - } - -} diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt new file mode 100644 index 00000000..d5b18e68 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Some +import org.jetbrains.spek.api.Spek +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import java.io.InputStreamReader +import java.io.Reader +import java.time.Duration + +/** + * @author Pawel Biniek <pawel.biniek@nokia.com> + * @since February 2019 + */ +internal object ConfigurationMergerTest : Spek({ + describe("Merges partial configurations into one") { + it("merges single parameter into empty config") { + val actual = PartialConfiguration() + val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + + val result = ConfigurationMerger().merge(actual, diff) + + assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + } + + it("merges single embedded parameter into empty config") { + val actual = PartialConfiguration() + val serverConfig = PartialServerConfig(listenPort = Some(45)) + val diff = PartialConfiguration(server = Some(serverConfig)) + + val result = ConfigurationMerger().merge(actual, diff) + + assertThat(result.server).isEqualTo(Some(serverConfig)) + } + + it("merges single parameter into full config") { + val actual = FileConfigurationReader().loadConfig( + InputStreamReader( + FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + + val result = ConfigurationMerger().merge(actual, diff) + + assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + } + + it("merges single embedded parameter into full config") { + val actual = FileConfigurationReader().loadConfig( + InputStreamReader( + FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + val serverConfig = PartialServerConfig(listenPort = Some(45)) + val diff = PartialConfiguration(server = Some(serverConfig)) + + val result = ConfigurationMerger().merge(actual, diff) + + assertThat(result.server.orNull()?.listenPort).isEqualTo(serverConfig.listenPort) + assertThat(result.server.orNull()?.idleTimeoutSec?.isEmpty()).isFalse() + assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) + assertThat(result.server.orNull()?.maxPayloadSizeBytes?.isEmpty()).isFalse() + assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000)) + } + + it("merges full config into single parameter") { + val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + val diff = FileConfigurationReader().loadConfig( + InputStreamReader( + FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) + + val result = ConfigurationMerger().merge(actual, diff) + + assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR)) + assertThat(result.server.isEmpty()).isFalse() + assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000)) + assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) + + assertThat(result.security.isEmpty()).isFalse() + assertThat(result.cbs.isEmpty()).isFalse() + } + } +}) + diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index beb5df61..4b89488b 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -28,7 +28,7 @@ import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys @@ -53,21 +53,20 @@ internal object ConfigurationValidatorTest : Spek({ val config = PartialConfiguration( Some(PartialServerConfig( Some(1), - Some(2), + Some(Duration.ofSeconds(2)), Some(3) )), Some(PartialCbsConfig( - Some(5), - Some(3) + Some(Duration.ofSeconds(5)), + Some(Duration.ofSeconds(3)) )), Some(PartialSecurityConfig( Some(mock()) )), Some(PartialCollectorConfig( - Some(true), Some(4), Some(emptyList()), - Some(routing { }.build()) + someFromEmptyRouting )), None ) @@ -86,10 +85,9 @@ internal object ConfigurationValidatorTest : Spek({ } describe("validating complete configuration") { - val idleTimeoutSec = 10 - val firstReqDelaySec = 10 + val idleTimeoutSec = Duration.ofSeconds(10L) + val firstReqDelaySec = Duration.ofSeconds(10L) val securityKeys = Some(mock<SecurityKeys>()) - val routing = routing { }.build() val config = PartialConfiguration( Some(PartialServerConfig( @@ -99,16 +97,15 @@ internal object ConfigurationValidatorTest : Spek({ )), Some(PartialCbsConfig( Some(firstReqDelaySec), - Some(3) + Some(Duration.ofSeconds(3)) )), Some(PartialSecurityConfig( securityKeys )), Some(PartialCollectorConfig( - Some(true), Some(4), Some(emptyList()), - Some(routing) + someFromEmptyRouting )), Some(LogLevel.INFO) ) @@ -121,26 +118,25 @@ internal object ConfigurationValidatorTest : Spek({ }, { assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong())) + .isEqualTo(idleTimeoutSec) assertThat(it.security.keys) .isEqualTo(securityKeys) assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong())) + .isEqualTo(firstReqDelaySec) assertThat(it.collector.routing) - .isEqualTo(routing) + .isEqualTo(emptyRouting) } ) } } describe("validating configuration with security disabled") { - val idleTimeoutSec = 10 - val firstReqDelaySec = 10 + val idleTimeoutSec = Duration.ofSeconds(10) + val firstReqDelaySec = Duration.ofSeconds(10) val securityKeys: Option<SecurityKeys> = None - val routing = routing { }.build() val config = PartialConfiguration( Some(PartialServerConfig( @@ -150,16 +146,15 @@ internal object ConfigurationValidatorTest : Spek({ )), Some(PartialCbsConfig( Some(firstReqDelaySec), - Some(3) + Some(Duration.ofSeconds(3)) )), Some(PartialSecurityConfig( securityKeys )), Some(PartialCollectorConfig( - Some(true), Some(4), Some(emptyList()), - Some(routing) + someFromEmptyRouting )), Some(LogLevel.INFO) ) @@ -172,16 +167,16 @@ internal object ConfigurationValidatorTest : Spek({ }, { assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong())) + .isEqualTo(idleTimeoutSec) assertThat(it.security.keys) .isEqualTo(securityKeys) assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong())) + .isEqualTo(firstReqDelaySec) assertThat(it.collector.routing) - .isEqualTo(routing) + .isEqualTo(emptyRouting) } ) } @@ -189,3 +184,6 @@ internal object ConfigurationValidatorTest : Spek({ } }) + +val emptyRouting: Routing = emptyList() +val someFromEmptyRouting = Some(emptyRouting) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt index 8267e304..4e35bfb3 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt @@ -24,11 +24,11 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import java.io.StringReader import java.net.InetSocketAddress +import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -59,50 +59,6 @@ internal object FileConfigurationReaderTest : Spek({ assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003)) } - it("parses ip address") { - val input = """{ "collector" : { - "kafkaServers": [ - "192.168.255.1:5005", - "192.168.255.26:5006" - ] - } - }""" - - val config = cut.loadConfig(StringReader(input)) - assertThat(config.collector.nonEmpty()).isTrue() - val collector = config.collector.orNull() as PartialCollectorConfig - assertThat(collector.kafkaServers.nonEmpty()).isTrue() - val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress> - assertThat(addresses) - .isEqualTo(listOf( - InetSocketAddress("192.168.255.1", 5005), - InetSocketAddress("192.168.255.26", 5006) - )) - } - - it("parses routing array with RoutingAdapter") { - val input = """{ - "collector" : { - "routing" : [ - { - "fromDomain": "perf3gpp", - "toTopic": "HV_VES_PERF3GPP" - } - ] - } - }""".trimIndent() - val config = cut.loadConfig(StringReader(input)) - assertThat(config.collector.nonEmpty()).isTrue() - val collector = config.collector.orNull() as PartialCollectorConfig - assertThat(collector.routing.nonEmpty()).isTrue() - val routing = collector.routing.orNull() as Routing - routing.run { - assertThat(routes.size).isEqualTo(1) - assertThat(routes[0].domain).isEqualTo("perf3gpp") - assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP") - } - } - it("parses disabled security configuration") { val input = """{ "security": { @@ -139,22 +95,13 @@ internal object FileConfigurationReaderTest : Spek({ assertThat(config.cbs.nonEmpty()).isTrue() val cbs = config.cbs.orNull() as PartialCbsConfig - assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7)) - assertThat(cbs.requestIntervalSec).isEqualTo(Some(900)) - - assertThat(config.collector.nonEmpty()).isTrue() - val collector = config.collector.orNull() as PartialCollectorConfig - collector.run { - assertThat(dummyMode).isEqualTo(Some(false)) - assertThat(maxRequestSizeBytes).isEqualTo(Some(512000)) - assertThat(kafkaServers.nonEmpty()).isTrue() - assertThat(routing.nonEmpty()).isTrue() - } + assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7))) + assertThat(cbs.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900))) assertThat(config.server.nonEmpty()).isTrue() val server = config.server.orNull() as PartialServerConfig server.run { - assertThat(idleTimeoutSec).isEqualTo(Some(1200)) + assertThat(idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200))) assertThat(listenPort).isEqualTo(Some(6000)) assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000)) } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt deleted file mode 100644 index f70c4337..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl.gsonadapters - -import com.google.gson.Gson -import com.google.gson.JsonDeserializationContext -import com.google.gson.JsonParseException -import com.google.gson.reflect.TypeToken -import com.nhaarman.mockitokotlin2.mock -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter.InvalidAddressException -import java.lang.NumberFormatException -import kotlin.test.assertFailsWith - - -internal object AddressAdapterTest : Spek({ - - describe("deserialization") { - val gson = Gson() - val context = mock<JsonDeserializationContext>() - val addressAdapterType = TypeToken.get(AddressAdapter::class.java).type - - val cut = AddressAdapter() - - given("valid string") { - val address = "hostname:9000" - val json = gson.toJsonTree(address) - - it("should return address") { - val deserialized = cut.deserialize(json, addressAdapterType, context) - - assertThat(deserialized.hostName).isEqualTo("hostname") - assertThat(deserialized.port).isEqualTo(9000) - } - } - - val invalidAddresses = mapOf( - Pair("missingPort", InvalidAddressException::class), - Pair("NaNPort:Hey", NumberFormatException::class), - Pair(":6036", InvalidAddressException::class)) - - invalidAddresses.forEach { address, exception -> - given("invalid address string: $address") { - - val json = gson.toJsonTree(address) - it("should throw exception") { - assertFailsWith(exception) { - cut.deserialize(json, addressAdapterType, context) - } - } - } - } - } -}) - - diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json index b49085e8..07f0702f 100644 --- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json +++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json @@ -18,19 +18,5 @@ "trustStoreFile": "trust.ks.pkcs12", "trustStorePassword": "changeMeToo" } - }, - "collector": { - "dummyMode": false, - "maxRequestSizeBytes": 512000, - "kafkaServers": [ - "192.168.255.1:5005", - "192.168.255.1:5006" - ], - "routing": [ - { - "fromDomain": "perf3gpp", - "toTopic": "HV_VES_PERF3GPP" - } - ] } -}
\ No newline at end of file +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 0a1e2d43..1b92d90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -26,13 +27,21 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux -interface Sink { +interface Sink : Closeable { + fun send(message: RoutedMessage) = send(Flux.just(message)) + fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } +interface SinkProvider : Closeable { + operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> +} + +typealias ConfigurationProvider = () -> Flux<Routing> + interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) @@ -42,11 +51,3 @@ interface Metrics { fun notifyClientConnected() fun notifyClientRejected(cause: ClientRejectionCause) } - -interface SinkProvider : Closeable { - operator fun invoke(ctx: ClientContext): Sink -} - -interface ConfigurationProvider { - operator fun invoke(): Flux<Sequence<KafkaSink>> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index fa4f9670..2b29acd9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Sequence<KafkaSink>>() + val config = AtomicReference<Routing>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,17 +71,15 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(kafkaSinks, ctx), - sink = sinkProvider(ctx), + router = Router(routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) } } - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index d2c35cbb..6e2e20f7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,39 +19,75 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.None +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux -class Router(private val routing: Routing, private val ctx: ClientContext) { +class Router internal constructor(private val routing: Routing, + private val messageSinks: Map<String, Lazy<Sink>>, + private val ctx: ClientContext, + private val metrics: Metrics) { + constructor(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext, + metrics: Metrics) : + this(routing, + constructMessageSinks(routing, sinkProvider, ctx), + ctx, + metrics) { + logger.debug(ctx::mdc) { "Routing for client: $routing" } + logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + } - constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( - routing { - kafkaSinks.forEach { - defineRoute { - fromDomain(it.name()) - toTopic(it.topicName()) - withFixedPartitioning() + fun route(message: VesMessage): Flux<ConsumedMessage> = + routeFor(message.header) + .fold({ + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } + Flux.empty<Route>() + }, { + logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } + Flux.just(it) + }) + .flatMap { + val sinkTopic = it.sink.topicName() + messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } - } - }.build(), - ctx - ) - - fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) }.also { - if (it.isEmpty()) { - logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } - } + + private fun routeFor(header: CommonEventHeader) = + routing.find { it.domain == header.domain }.toOption() + + private fun messageSinkFor(sinkTopic: String) = messageSinks + .getOrElse(sinkTopic) { + throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic") } companion object { - private val logger = Logger(Routing::class) + private val logger = Logger(Router::class) + private val NONE_PARTITION = None + + internal fun constructMessageSinks(routing: Routing, + sinkProvider: SinkProvider, + ctx: ClientContext) = + routing.map(Route::sink) + .distinctBy { it.topicName() } + .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) } + + private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message) } + +internal class MissingMessageSinkException(msg: String) : Throwable(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 6a2792c3..433e4d57 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -31,15 +30,12 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE -import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MessageEither -import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -53,7 +49,6 @@ internal class VesHvCollector( private val wireChunkDecoder: WireChunkDecoder, private val protobufDecoder: VesDecoder, private val router: Router, - private val sink: Sink, private val metrics: Metrics) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = @@ -62,10 +57,10 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - .transform(::routeMessage) - .onErrorResume { - metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) - logger.handleReactiveStreamError(clientContext, it) } + // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here + .handleErrors() + .transform(::route) + .handleErrors() .doFinally { releaseBuffersMemory() } .then() @@ -98,18 +93,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux - .flatMap(this::findRoute) - .compose(sink::send) + private fun route(flux: Flux<VesMessage>) = flux + .flatMap(router::route) .doOnNext(this::updateSinkMetrics) - private fun findRoute(msg: VesMessage) = router - .findDestination(msg) - .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) } - .filterEmptyWithLog(logger, clientContext::fullMdc, - { "Found route for message: ${it.topic}, partition: ${it.partition}" }, - { "Could not find route for message" }) - private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { when (consumedMessage) { is SuccessfullyConsumedMessage -> @@ -119,6 +106,11 @@ internal class VesHvCollector( } } + private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume { + metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) + logger.handleReactiveStreamError(clientContext, it) + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index c362020e..20b11753 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,11 +31,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = - if (config.dummyMode) - LoggingSinkProvider() - else - KafkaSinkProvider(config) + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index f9fd6986..185693c0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,18 +22,20 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux @@ -73,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<Sequence<KafkaSink>> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -81,25 +83,25 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient + private fun handleUpdates(cbsClient: CbsClient) = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createCollectorConfiguration) + .map(::createRoutingDescription) .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } .retryWhen(retry) - - private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = - try { - DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .asSequence() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } + private fun createRoutingDescription(configuration: JsonObject): Routing = try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .map { Route(it.name(), it) } + .asIterable() + .toList() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt deleted file mode 100644 index 3a9467f7..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ConsumedMessage -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -internal class LoggingSinkProvider : SinkProvider { - - override fun invoke(ctx: ClientContext): Sink { - return object : Sink { - private val totalMessages = AtomicLong() - private val totalBytes = AtomicLong() - - override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = - messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage) - - private fun logMessage(msg: RoutedMessage) { - val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong()) - val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } - if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(ctx, logMessageSupplier) - else - logger.trace(ctx, logMessageSupplier) - } - - } - } - - companion object { - const val INFO_LOGGING_FREQ = 100_000 - private val logger = Logger(LoggingSinkProvider::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 5052cc5c..2ce0f42f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext @@ -39,41 +40,39 @@ import reactor.kafka.sender.SenderRecord * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, - private val ctx: ClientContext) : Sink { +internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>, + private val ctx: ClientContext) : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = - messages.map(::vesToKafkaRecord).let { records -> - sender.send(records).map { - val msg = it.correlationMetadata() - if (it.exception() == null) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - "Message sent to Kafka with metadata: ${it.recordMetadata()}" + messages.map(::vesToKafkaRecord) + .compose { sender.send(it) } + .map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) + } else { + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - SuccessfullyConsumedMessage(msg) - } else { - logger.warn(ctx::fullMdc, Marker.Invoke()) { - "Failed to send message to Kafka. Reason: ${it.exception().message}" - } - logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } - FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } - } - } private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> = SenderRecord.create( - routed.topic, - routed.partition, + routed.targetTopic, + routed.partition.orNull(), FILL_TIMESTAMP_LATER, routed.message.header, routed.message, routed) - internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender - companion object { private val FILL_TIMESTAMP_LATER: Long? = null - private val logger = Logger(KafkaSink::class) + private val logger = Logger(KafkaPublisher::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 96e45a02..7a498652 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,66 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import arrow.effects.IO -import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.lang.Integer.max +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider internal constructor( - private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { - - constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) - - override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) +internal class KafkaSinkProvider : SinkProvider { + private val messageSinks = synchronizedMap( + mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } override fun close() = IO { - kafkaSender.close() - logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" } + messageSinks.values.forEach { it.close() } + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } } companion object { private val logger = Logger(KafkaSinkProvider::class) - private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f - private const val BUFFER_MEMORY_MULTIPLIER = 32 - private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - - private fun constructKafkaSender(config: CollectorConfiguration) = - KafkaSender.create(constructSenderOptions(config)) - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) - .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) - .producerProperty(RETRIES_CONFIG, 1) - .producerProperty(ACKS_CONFIG, "1") - .stopOnError(false) - - private fun maxRequestSize(maxRequestSizeBytes: Int) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - - private fun bufferMemory(maxRequestSizeBytes: Int) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt new file mode 100644 index 00000000..40de8c51 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + + +private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f +private const val BUFFER_MEMORY_MULTIPLIER = 32 +private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 + +internal fun createKafkaSender(sinkStream: SinkStream) = + (sinkStream as KafkaSink).let { kafkaSink -> + KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers()) + .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink)) + .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink)) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) + .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .stopOnError(false) + ) + } + +private fun maxRequestSize(kafkaSink: KafkaSink) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt() + +private fun bufferMemory(kafkaSink: KafkaSink) = + Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 96298167..6b9c6803 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,30 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None -import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.core.publisher.Flux +import reactor.test.StepVerifier /** @@ -42,72 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame * @since May 2018 */ object RouterTest : Spek({ - given("sample configuration") { - val config = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic("ves_rtpm") - withFixedPartitioning(2) - } + describe("Router") { - defineRoute { - fromDomain(SYSLOG.domainName) - toTopic("ves_trace") - withFixedPartitioning() - } - }.build() - val cut = Router(config, ClientContext()) + whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + val messageSinkMap = mapOf( + Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(syslogTopic, lazyOf(messageSinkMock)) + ) - it("should have route available") { - assertThat(result).isNotNull() - } + given("sample routing specification") { + val cut = router(defaultRouting, messageSinkMap) - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } + on("message with existing route (rtpm)") { + whenever(messageSinkMock.send(routedPerf3GppMessage)) + .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage)) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } + it("should be properly routed") { + val result = cut.route(perf3gppMessage) - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } + assertThat(result).isNotNull() + StepVerifier.create(result) + .expectNext(successfullyConsumedPerf3gppMessage) + .verifyComplete() - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() + verify(perf3gppSinkMock).topicName() + verify(messageSinkMock).send(routedPerf3GppMessage) + } } - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } + on("message with existing route (syslog)") { + whenever(messageSinkMock.send(routedSyslogMessage)) + .thenReturn(Flux.just(successfullyConsumedSyslogMessage)) + val result = cut.route(syslogMessage) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) - } + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedSyslogMessage) + .verifyComplete() - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + verify(syslogSinkMock).topicName() + verify(messageSinkMock).send(routedSyslogMessage) + } } - } - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val result = cut.route(message) - it("should not have route available") { - assertThat(result).isEqualTo(None) + it("should not have route available") { + StepVerifier.create(result).verifyComplete() + } } } } -})
\ No newline at end of file + +}) + +private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = + Router(routing, kafkaPublisherMap, ClientContext(), mock()) + +private val perf3gppTopic = "PERF_PERF" +private val perf3gppSinkMock = mock<KafkaSink>() +private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) + +private val syslogTopic = "SYS_LOG" +private val syslogSinkMock = mock<KafkaSink>() +private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) + +private val messageSinkMock = mock<Sink>() +private val default_partition = None + +private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) +private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) + +private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) +private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt index 571a6680..8616ce03 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt @@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({ .expectNoEvent(waitTime) } } - } + given("valid configuration from cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) @@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val receivedSink1 = it.elementAt(0) - val receivedSink2 = it.elementAt(1) + val route1 = it.elementAt(0) + val route2 = it.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) assertThat(receivedSink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) assertThat(receivedSink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + }.verifyComplete() } } @@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({ }) + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") .password("very secure password") @@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", @@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse(""" "topic_name": "REG_HVVES_PERF3GPP" } }, - "perf3gpp_central": { + "$PERF3GPP_CENTRAL": { "type": "kafka", "aaf_credentials": { "username": "other_client", @@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse(""" private val invalidConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt deleted file mode 100644 index 1e3f2e7a..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import arrow.syntax.collections.tail -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.routing -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.ves.VesEventOuterClass -import reactor.kafka.sender.KafkaSender - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since December 2018 - */ -internal object KafkaSinkProviderTest : Spek({ - describe("non functional requirements") { - given("sample configuration") { - val config = CollectorConfiguration( - dummyMode = false, - maxRequestSizeBytes = 1024 * 1024, - kafkaServers = "localhost:9090", - routing = routing { }.build()) - - val cut = KafkaSinkProvider(config) - - on("sample clients") { - val clients = listOf( - ClientContext(), - ClientContext(), - ClientContext(), - ClientContext()) - - it("should create only one instance of KafkaSender") { - val sinks = clients.map(cut::invoke) - val firstSink = sinks[0] - val restOfSinks = sinks.tail() - - assertThat(restOfSinks).isNotEmpty - assertThat(restOfSinks).allSatisfy { sink -> - assertThat(firstSink.usesSameSenderAs(sink)) - .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") - .isTrue() - } - } - } - } - - given("dummy KafkaSender") { - val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock() - val cut = KafkaSinkProvider(kafkaSender) - - on("close") { - cut.close().unsafeRunSync() - - it("should close KafkaSender") { - verify(kafkaSender).close() - } - } - } - } -}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index a6b32ed9..92719e94 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -107,8 +107,8 @@ object MetricsSpecification : Spek({ assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) - .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") + assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC)) + .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric") .isEqualTo(1) } } @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 50fe098c..61a9a356 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec logger.info { "Processed $runs connections each containing $numMessages msgs." } - logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } + logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) @@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({ }) -private const val ONE_MILION = 1_000_000.0 +private const val ONE_MILLION = 1_000_000.0 private val rand = Random() private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index eb3ba264..ec540606 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : AutoCloseable { +class Sut(sink: Sink = StoringSink()) : Closeable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -60,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider) + healthStateProvider + ) + private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector @@ -68,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { throw IllegalStateException("Collector not available.") } - override fun close() { - collectorProvider.close().unsafeRunSync() + + fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) + return sink.sentMessages + } + + fun handleConnection(vararg packets: ByteBuf) { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) } + override fun close() = collectorProvider.close() + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } - class DummySinkProvider(private val sink: Sink) : SinkProvider { - private val active = AtomicBoolean(true) + private val sinkInitialized = AtomicBoolean(false) - override fun invoke(ctx: ClientContext) = sink - - override fun close() = IO { - active.set(false) + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + sinkInitialized.set(true) + sink } - val closed get() = !active.get() - + override fun close() = + if (sinkInitialized.get()) { + sink.close() + } else { + IO.unit + } } private val timeout = Duration.ofSeconds(10) -fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) - return sink.sentMessages -} - -fun Sut.handleConnection(vararg packets: ByteBuf) { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) -} - -fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 21c5c189..5d215fc5 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.None import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -65,12 +65,28 @@ object VesHvSpecification : Spek({ .hasSize(2) } + it("should create sink lazily") { + val (sut, sink) = vesHvWithStoringSink() + + // just connecting should not create sink + sut.handleConnection() + sut.close().unsafeRunSync() + + // then + assertThat(sink.closed).isFalse() + } + it("should close sink when closing collector provider") { - val (sut, _) = vesHvWithStoringSink() + val (sut, sink) = vesHvWithStoringSink() + // given Sink initialized + // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) - sut.close() + // when + sut.close().unsafeRunSync() - assertThat(sut.sinkProvider.closed).isTrue() + // then + assertThat(sink.closed).isTrue() } } @@ -145,14 +161,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None) } it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -161,14 +177,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages[0].topic).describedAs("first message topic") + assertThat(messages[0].targetTopic).describedAs("first message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[1].topic).describedAs("second message topic") + assertThat(messages[1].targetTopic).describedAs("second message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[2].topic).describedAs("last message topic") - .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + assertThat(messages[2].targetTopic).describedAs("last message topic") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) } it("should drop message if route was not found") { @@ -181,7 +197,7 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -205,7 +221,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,21 +229,21 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configWithEmptyRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] - assertThat(message.topic).describedAs("routed message topic after configuration's change") + assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(PERF3GPP_TOPIC) assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should change domain routing") { @@ -236,22 +252,22 @@ object VesHvSpecification : Spek({ assertThat(messages).hasSize(1) val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") .isEqualTo(PERF3GPP_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should update routing for each client sending one message") { @@ -261,7 +277,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -269,8 +285,8 @@ object VesHvSpecification : Spek({ val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -287,7 +303,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) println("config changed") } } @@ -297,8 +313,8 @@ object VesHvSpecification : Spek({ sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -320,7 +336,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +365,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 6599d402..c465fd91 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,67 +20,21 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcae.collectors.veshv.config.api.model.Routing import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" -const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" -const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" - -val configWithBasicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithTwoDomainsToOneTopicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(HEARTBEAT.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(MEASUREMENT.domainName) - .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithDifferentRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(ALTERNATE_PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() - ) - -val configWithEmptyRouting = emptySequence<KafkaSink>() - - class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() + private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(kafkaSinkSequence) + configStream.onNext(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index b599a076..a450b794 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ class FakeMetrics : Metrics { override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic.compute(msg.targetTopic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt new file mode 100644 index 00000000..e9914ef1 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink + +const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" +const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" +const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" +const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val perf3gppKafkaSink = ImmutableKafkaSink.builder() + .name("PERF3GPP") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() +private val alternativeKafkaSink = ImmutableKafkaSink.builder() + .name("ALTERNATE") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() + + +val basicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink) +) + +val alternativeRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink) +) + +val twoDomainsToOneTopicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink), + Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink), + Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink) +) + +val emptyRouting: Routing = emptyList() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 51f724e0..160defdb 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage @@ -30,6 +31,7 @@ import reactor.core.publisher.Flux import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong /** @@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong */ class StoringSink : Sink { private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + private val active = AtomicBoolean(true) + val closed get() = !active.get() val sentMessages: List<RoutedMessage> get() = sent.toList() @@ -45,6 +49,13 @@ class StoringSink : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } + + /* + * TOD0: if the code would look like: + * ```IO { active.set(false) }``` + * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) + */ + override fun close() = active.set(false).run { IO.unit } } /** diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt index e4d147b1..04f9be63 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,4 +19,9 @@ */ package org.onap.dcae.collectors.veshv.domain -data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) +import arrow.core.Option + + +data class RoutedMessage(val message: VesMessage, + val targetTopic: String, + val partition: Option<Int>) diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json index 7f88cb6e..67576c80 100644 --- a/sources/hv-collector-main/src/main/docker/base.json +++ b/sources/hv-collector-main/src/main/docker/base.json @@ -12,7 +12,6 @@ "security": { }, "collector": { - "dummyMode": false, "maxRequestSizeBytes": 1048576, "kafkaServers": [ "message-router-kafka:9092" diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 2fb44768..c04c2c95 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() sentMessages.increment() - sentMessagesByTopic(msg.topic).increment() + sentMessagesByTopic(msg.targetTopic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index d15dccef..aed4d928 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -59,9 +60,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( AdapterFactory.configurationProvider(config.cbs), - AdapterFactory.sinkCreatorFactory(config.collector), + AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes + config.server.maxPayloadSizeBytes, + HealthState.INSTANCE ) private fun logServerStarted(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e452a5f4..f260f158 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.Option import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge @@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.ves.VesEventOuterClass import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit @@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + vesEvent().run { toRoutedMessage(topic, partition) } fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - } + vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = - vesEvent().let { evt -> - val builder = evt.toBuilder() + vesEvent().run { + val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 - builder.build() - }.let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - }
\ No newline at end of file + builder.build().toRoutedMessage(topic, partition) + } + +private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String, + partition: Int, + receivedAt: Temporal = Instant.now()) = + RoutedMessage( + VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)), + topic, + Option.just(partition) + ) + |