aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt22
1 files changed, 15 insertions, 7 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index c4e877bf..dfa40006 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,15 +20,17 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
+import arrow.core.Option
import arrow.core.toOption
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -36,9 +38,9 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
internal class Router internal constructor(private val routing: Routing,
- private val messageSinks: Map<String, Lazy<Sink>>,
- private val ctx: ClientContext,
- private val metrics: Metrics) {
+ private val messageSinks: Map<String, Lazy<Sink>>,
+ private val ctx: ClientContext,
+ private val metrics: Metrics) {
constructor(routing: Routing,
sinkFactory: SinkFactory,
ctx: ClientContext,
@@ -70,8 +72,14 @@ internal class Router internal constructor(private val routing: Routing,
}
- private fun routeFor(header: CommonEventHeader) =
- routing.find { it.domain == header.domain }.toOption()
+ private fun routeFor(header: CommonEventHeader): Option<Route> =
+ routing.find {
+ if (header.domain == VesEventDomain.STND_DEFINED.domainName)
+ it.domain == header.stndDefinedNamespace
+ else {
+ it.domain == header.domain
+ }
+ }.toOption()
private fun messageSinkFor(sinkTopic: String) = messageSinks
.getOrElse(sinkTopic) {