diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index bd92c6d3..723ba39a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink class Router(private val routing: Routing, private val ctx: ClientContext) { + + constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( + routing { + kafkaSinks.forEach { + defineRoute { + fromDomain(it.name()) + toTopic(it.topicName()) + withFixedPartitioning() + } + } + }.build(), + ctx + ) + fun findDestination(message: VesMessage): Option<RoutedMessage> = routing.routeFor(message.header).map { it(message) }.also { if (it.isEmpty()) { |