aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 13:57:11 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-27 12:44:17 +0100
commitd6e646205cf290f46e980ad2470225c7d0b42618 (patch)
tree0f441ca56396dcb3fb455b849f5ddd8f421167a2 /sources/hv-collector-core/src/main
parent2b748ea515290984c5657e4d4a1027ff2e90bd61 (diff)
Move routing functionality inside Router
- also removed Routing-DSL as it won't be needed anymore Change-Id: Ifc7bc7641a60936b5257c0bff7a8c51dddc30687 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt36
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt9
2 files changed, 23 insertions, 22 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index d2c35cbb..a95a44d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,36 +20,36 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
class Router(private val routing: Routing, private val ctx: ClientContext) {
constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
- routing {
- kafkaSinks.forEach {
- defineRoute {
- fromDomain(it.name())
- toTopic(it.topicName())
- withFixedPartitioning()
- }
- }
- }.build(),
- ctx
- )
+ Routing(
+ kafkaSinks.map { Route(it.name(), it.topicName()) }.toList()
+ ),
+ ctx)
fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }.also {
- if (it.isEmpty()) {
- logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
- }
- }
+ routeFor(message.header)
+ .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } }
+ .map { it.routeMessage(message) }
+
+ private fun Route.routeMessage(message: VesMessage) =
+ RoutedMessage(targetTopic, partitioning(message.header), message)
+
+ private fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ routing.routes.find { it.domain == commonHeader.domain }.toOption()
companion object {
private val logger = Logger(Routing::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index f9fd6986..1f5df371 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -27,14 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -94,13 +94,14 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
try {
DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
+ .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
.map(streamParser::unsafeParse)
.asSequence()
} catch (e: NullPointerException) {
throw ParsingException("Failed to parse configuration", e)
}
+
companion object {
private const val MAX_RETRIES = 5L
private val logger = Logger(ConfigurationProviderImpl::class)