summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt55
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt36
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt9
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt23
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt2
7 files changed, 37 insertions, 95 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index aab8ecad..45180a84 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -19,59 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
-data class Routing(val routes: List<Route>) {
+data class Routing(val routes: List<Route>)
- fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
-
- fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
- operator fun invoke(message: VesMessage): RoutedMessage =
- RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-HvVesConfiguration DSL
-*/
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
-
-class RoutingBuilder {
- private val routes: MutableList<RouteBuilder> = mutableListOf()
-
- fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
- .apply(init)
- .also { routes.add(it) }
-
- fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
- private lateinit var domain: String
- private lateinit var targetTopic: String
- private lateinit var partitioning: (CommonEventHeader) -> Int
-
- fun fromDomain(domain: String): RouteBuilder = apply {
- this.domain = domain
- }
-
- fun toTopic(targetTopic: String): RouteBuilder = apply {
- this.targetTopic = targetTopic
- }
-
- fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
- partitioning = { num }
- }
-
- fun build() = Route(domain, targetTopic, partitioning)
-}
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index d2c35cbb..a95a44d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,36 +20,36 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
class Router(private val routing: Routing, private val ctx: ClientContext) {
constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
- routing {
- kafkaSinks.forEach {
- defineRoute {
- fromDomain(it.name())
- toTopic(it.topicName())
- withFixedPartitioning()
- }
- }
- }.build(),
- ctx
- )
+ Routing(
+ kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
+ ),
+ ctx)
fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }.also {
- if (it.isEmpty()) {
- logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
- }
- }
+ routeFor(message.header)
+ .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
+ .map { it.routeMessage(message) }
+
+ private fun Route.routeMessage(message: VesMessage) =
+ RoutedMessage(targetTopic, partitioning(message.header), message)
+
+ private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ routing.routes.find { it.domain == commonHeader.domain }.toOption()
companion object {
private val logger = Logger(Routing::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index f9fd6986..1f5df371 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -27,14 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -94,13 +94,14 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
try {
DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
+ .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
.map(streamParser::unsafeParse)
.asSequence()
} catch (e: NullPointerException) {
throw ParsingException("Failed to parse configuration", e)
}
+
companion object {
private const val MAX_RETRIES = 5L
private val logger = Logger(ConfigurationProviderImpl::class)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 96298167..b8b55865 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,7 +26,8 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
@@ -43,20 +44,10 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
*/
object RouterTest : Spek({
given("sample configuration") {
- val config = routing {
-
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic("ves_rtpm")
- withFixedPartitioning(2)
- }
-
- defineRoute {
- fromDomain(SYSLOG.domainName)
- toTopic("ves_trace")
- withFixedPartitioning()
- }
- }.build()
+ val config = Routing(listOf(
+ Route(PERF3GPP.domainName, "ves_rtpm", { 2 }),
+ Route(SYSLOG.domainName, "ves_trace")
+ ))
val cut = Router(config, ClientContext())
on("message with existing route (rtpm)") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index 8ea53fbe..eb0a3173 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.ves.VesEventOuterClass
@@ -45,7 +45,8 @@ internal object KafkaSinkProviderTest : Spek({
val config = CollectorConfiguration(
maxRequestSizeBytes = 1024 * 1024,
kafkaServers = "localhost:9090",
- routing = routing { }.build())
+ routing = Routing(emptyList())
+ )
val cut = KafkaSinkProvider(config)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index d965b787..109915a1 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 6425601e..a398967d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.