diff options
7 files changed, 37 insertions, 95 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt index aab8ecad..45180a84 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt @@ -19,59 +19,8 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -import arrow.core.Option -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader -data class Routing(val routes: List<Route>) { +data class Routing(val routes: List<Route>) - fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) -} - -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) { - - fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain - - operator fun invoke(message: VesMessage): RoutedMessage = - RoutedMessage(targetTopic, partitioning(message.header), message) -} - - -/* -HvVesConfiguration DSL -*/ - -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init) - -class RoutingBuilder { - private val routes: MutableList<RouteBuilder> = mutableListOf() - - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder() - .apply(init) - .also { routes.add(it) } - - fun build() = Routing(routes.map { it.build() }.toList()) -} - -class RouteBuilder { - - private lateinit var domain: String - private lateinit var targetTopic: String - private lateinit var partitioning: (CommonEventHeader) -> Int - - fun fromDomain(domain: String): RouteBuilder = apply { - this.domain = domain - } - - fun toTopic(targetTopic: String): RouteBuilder = apply { - this.targetTopic = targetTopic - } - - fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply { - partitioning = { num } - } - - fun build() = Route(domain, targetTopic, partitioning) -} +data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 })
\ No newline at end of file diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index d2c35cbb..a95a44d5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,36 +20,36 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.ves.VesEventOuterClass.CommonEventHeader class Router(private val routing: Routing, private val ctx: ClientContext) { constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( - routing { - kafkaSinks.forEach { - defineRoute { - fromDomain(it.name()) - toTopic(it.topicName()) - withFixedPartitioning() - } - } - }.build(), - ctx - ) + Routing( + kafkaSinks.map { Route(it.name(), it.topicName()) }.toList() + ), + ctx) fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) }.also { - if (it.isEmpty()) { - logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } - } - } + routeFor(message.header) + .doOnEmpty { logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" } } + .map { it.routeMessage(message) } + + private fun Route.routeMessage(message: VesMessage) = + RoutedMessage(targetTopic, partitioning(message.header), message) + + private fun routeFor(commonHeader: CommonEventHeader): Option<Route> = + routing.routes.find { it.domain == commonHeader.domain }.toOption() companion object { private val logger = Logger(Routing::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index f9fd6986..1f5df371 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -27,14 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA +import org.onap.dcaegen2.services.sdk.model.streams.StreamType +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -94,13 +94,14 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = try { DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) + .filter(StreamPredicates.streamOfType(StreamType.KAFKA)) .map(streamParser::unsafeParse) .asSequence() } catch (e: NullPointerException) { throw ParsingException("Failed to parse configuration", e) } + companion object { private const val MAX_RETRIES = 5L private val logger = Logger(ConfigurationProviderImpl::class) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 96298167..b8b55865 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG @@ -43,20 +44,10 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame */ object RouterTest : Spek({ given("sample configuration") { - val config = routing { - - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic("ves_rtpm") - withFixedPartitioning(2) - } - - defineRoute { - fromDomain(SYSLOG.domainName) - toTopic("ves_trace") - withFixedPartitioning() - } - }.build() + val config = Routing(listOf( + Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), + Route(SYSLOG.domainName, "ves_trace") + )) val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 8ea53fbe..eb0a3173 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.routing +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass @@ -45,7 +45,8 @@ internal object KafkaSinkProviderTest : Spek({ val config = CollectorConfiguration( maxRequestSizeBytes = 1024 * 1024, kafkaServers = "localhost:9090", - routing = routing { }.build()) + routing = Routing(emptyList()) + ) val cut = KafkaSinkProvider(config) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index d965b787..109915a1 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 6425601e..a398967d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. |