aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt16
1 files changed, 16 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index bd92c6d3..723ba39a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
class Router(private val routing: Routing, private val ctx: ClientContext) {
+
+ constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
+ routing {
+ kafkaSinks.forEach {
+ defineRoute {
+ fromDomain(it.name())
+ toTopic(it.topicName())
+ withFixedPartitioning()
+ }
+ }
+ }.build(),
+ ctx
+ )
+
fun findDestination(message: VesMessage): Option<RoutedMessage> =
routing.routeFor(message.header).map { it(message) }.also {
if (it.isEmpty()) {