From 7e77162022371860d13939be1848982a735cdab9 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Thu, 21 Mar 2019 14:03:53 +0100 Subject: Use DataStream API from CBS client Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb Issue-ID: DCAEGEN2-1346 Signed-off-by: kjaniak --- sources/hv-collector-core/pom.xml | 4 + .../dcae/collectors/veshv/boundary/adapters.kt | 4 +- .../collectors/veshv/factory/CollectorFactory.kt | 8 +- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 16 ++++ .../impl/adapters/ConfigurationProviderImpl.kt | 43 ++++------ .../impl/adapters/ConfigurationProviderTest.kt | 94 ++++++++++++++-------- 6 files changed, 102 insertions(+), 67 deletions(-) (limited to 'sources/hv-collector-core') diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 823f671a..e7134e18 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -119,6 +119,10 @@ io.projectreactor.kafka reactor-kafka + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 782d2324..f475a0eb 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -27,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.Flux interface Sink { @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux + operator fun invoke(): Flux> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index c08df748..c674ef36 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -37,6 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference() + val config = AtomicReference>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,12 +71,12 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(kafkaSinks: Sequence, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, ctx), + router = Router(kafkaSinks, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index bd92c6d3..723ba39a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink class Router(private val routing: Routing, private val ctx: ClientContext) { + + constructor(kafkaSinks: Sequence, ctx: ClientContext) : this( + routing { + kafkaSinks.forEach { + defineRoute { + fromDomain(it.name()) + toTopic(it.topicName()) + withFixedPartitioning() + } + } + }.build(), + ctx + ) + fun findDestination(message: VesMessage): Option = routing.routeFor(message.header).map { it(message) }.also { if (it.isEmpty()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index f96350ac..5b0dca2d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono, retrySpec: Retry ) : ConfigurationProvider { @@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval) @@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = + override fun invoke(): Flux> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux> = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) @@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono { + try { + val sinks = DataStreams.namedSinks(configuration) + .filter { it.type() == "kafka" } + return sinks.map(streamParser::unsafeParse).asSequence() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } + } companion object { - private const val ROUTING_CONFIGURATION_KEY = "collector.routing" - private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" - private const val TOPIC_CONFIGURATION_KEY = "toTopic" - private const val MAX_RETRIES = 5L private val logger = Logger(ConfigurationProviderImpl::class) } diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index f830f2c9..e71250ca 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials import reactor.core.publisher.Flux - import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - - val route1 = it.routes[0] - assertThat(FAULT.domainName) - .describedAs("routed domain 1") - .isEqualTo(route1.domain) - assertThat("test-topic-1") - .describedAs("target topic 1") - .isEqualTo(route1.targetTopic) - - val route2 = it.routes[1] - assertThat(HEARTBEAT.domainName) - .describedAs("routed domain 2") - .isEqualTo(route2.domain) - assertThat("test-topic-2") - .describedAs("target topic 2") - .isEqualTo(route2.targetTopic) - + val receivedSink1 = it.elementAt(0) + val receivedSink2 = it.elementAt(1) + + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") }.verifyComplete() } } @@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({ }) +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() private val validConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "fault", - "toTopic": "test-topic-1" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "perf3gpp_central": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" }, - { - "fromDomain": "heartbeat", - "toTopic": "test-topic-2" + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val invalidConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "garbage", - "meaningful": "garbage" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val firstRequestDelay = Duration.ofMillis(1) private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() private fun constructConfigurationProvider(cbsClientMono: Mono, healthState: HealthState, @@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono, firstRequestDelay, requestInterval, healthState, + streamParser, retry ) } -- cgit 1.2.3-korg