diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-26 13:57:11 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-27 12:44:17 +0100 |
commit | d6e646205cf290f46e980ad2470225c7d0b42618 (patch) | |
tree | 0f441ca56396dcb3fb455b849f5ddd8f421167a2 /sources/hv-collector-core | |
parent | 2b748ea515290984c5657e4d4a1027ff2e90bd61 (diff) |
Move routing functionality inside Router
- also removed Routing-DSL as it won't be needed anymore
Change-Id: Ifc7bc7641a60936b5257c0bff7a8c51dddc30687
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
4 files changed, 33 insertions, 40 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index d2c35cbb..a95a44d5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,36 +20,36 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader class Router(private val routing: Routing, private val ctx: ClientContext) { constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( - routing { - kafkaSinks.forEach { - defineRoute { - fromDomain(it.name()) - toTopic(it.topicName()) - withFixedPartitioning() - } - } - }.build(), - ctx - ) + Routing( + kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() + ), + ctx) fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) }.also { - if (it.isEmpty()) { - logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } - } - } + routeFor(message.header) + .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } + .map { it.routeMessage(message) } + + private fun Route.routeMessage(message: VesMessage) = + RoutedMessage(targetTopic, partitioning(message.header), message) + + private fun routeFor(commonHeader: CommonEventHeader): Option<Route> = + routing.routes.find { it.domain == commonHeader.domain }.toOption() companion object { private val logger = Logger(Routing::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index f9fd6986..1f5df371 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -27,14 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA +import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -94,13 +94,14 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = try { DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) + .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) .map(streamParser::unsafeParse) .asSequence() } catch (e: NullPointerException) { throw ParsingException("Failed to parse configuration", e) } + companion object { private const val MAX_RETRIES = 5L private val logger = Logger(ConfigurationProviderImpl::class) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 96298167..b8b55865 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG @@ -43,20 +44,10 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame */ object RouterTest : Spek({ given("sample configuration") { - val config = routing { - - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic("ves_rtpm") - withFixedPartitioning(2) - } - - defineRoute { - fromDomain(SYSLOG.domainName) - toTopic("ves_trace") - withFixedPartitioning() - } - }.build() + val config = Routing(listOf( + Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), + Route(SYSLOG.domainName, "ves_trace") + )) val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 8ea53fbe..eb0a3173 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass @@ -45,7 +45,8 @@ internal object KafkaSinkProviderTest : Spek({ val config = CollectorConfiguration( maxRequestSizeBytes = 1024 * 1024, kafkaServers = "localhost:9090", - routing = routing { }.build()) + routing = Routing(emptyList()) + ) val cut = KafkaSinkProvider(config) |