diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index 6e2e20f7..b03b89e1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing, fun route(message: VesMessage): Flux<ConsumedMessage> = routeFor(message.header) - .fold({ - metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) - logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } - logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" } - Flux.empty<Route>() - }, { - logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" } - Flux.just(it) - }) + .fold({ routeNotFound(message) }, { routeFound(message, it) }) .flatMap { val sinkTopic = it.sink.topicName() messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION)) } + private fun routeNotFound(message: VesMessage): Flux<Route> { + metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND) + logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" } + logger.trace(ctx::fullMdc) { "Routing available for client: $routing" } + return Flux.empty<Route>() + } + + private fun routeFound(message: VesMessage, route: Route): Flux<Route> { + logger.trace(ctx::fullMdc) { + "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION" + } + return Flux.just(route) + } + + private fun routeFor(header: CommonEventHeader) = routing.find { it.domain == header.domain }.toOption() |