aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt25
1 files changed, 16 insertions, 9 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index 6e2e20f7..b03b89e1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .fold({
- metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
- logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
- logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
- Flux.empty<Route>()
- }, {
- logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
- Flux.just(it)
- })
+ .fold({ routeNotFound(message) }, { routeFound(message, it) })
.flatMap {
val sinkTopic = it.sink.topicName()
messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
}
+ private fun routeNotFound(message: VesMessage): Flux<Route> {
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+ return Flux.empty<Route>()
+ }
+
+ private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+ logger.trace(ctx::fullMdc) {
+ "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+ }
+ return Flux.just(route)
+ }
+
+
private fun routeFor(header: CommonEventHeader) =
routing.find { it.domain == header.domain }.toOption()