diff options
author | kjaniak <kornel.janiak@nokia.com> | 2019-03-21 14:03:53 +0100 |
---|---|---|
committer | kjaniak <kornel.janiak@nokia.com> | 2019-03-26 13:05:51 +0100 |
commit | 7e77162022371860d13939be1848982a735cdab9 (patch) | |
tree | 6603c70af9029701294fd3783c9df1a1016b7c8f /sources | |
parent | c979ea3bd6059cb067a84ba8e6f8d1cf96d61ba2 (diff) |
Use DataStream API from CBS client
Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb
Issue-ID: DCAEGEN2-1346
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources')
11 files changed, 176 insertions, 140 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 823f671a..e7134e18 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -119,6 +119,10 @@ <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>cbs-client</artifactId> + </dependency> </dependencies> </project> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 782d2324..f475a0eb 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -27,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.Flux interface Sink { @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux<Routing> + operator fun invoke(): Flux<Sequence<KafkaSink>> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index c08df748..c674ef36 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -37,6 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import java.util.concurrent.atomic.AtomicReference /** @@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Routing>() + val config = AtomicReference<Sequence<KafkaSink>>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -71,12 +71,12 @@ class CollectorFactory(private val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, ctx), + router = Router(kafkaSinks, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index bd92c6d3..723ba39a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink class Router(private val routing: Routing, private val ctx: ClientContext) { + + constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this( + routing { + kafkaSinks.forEach { + defineRoute { + fromDomain(it.name()) + toTopic(it.topicName()) + withFixedPartitioning() + } + } + }.build(), + ctx + ) + fun findDestination(message: VesMessage): Option<RoutedMessage> = routing.routeFor(message.header).map { it(message) }.also { if (it.isEmpty()) { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index f96350ac..5b0dca2d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie private val firstRequestDelay: Duration, private val requestInterval: Duration, private val healthState: HealthState, + private val streamParser: StreamFromGsonParser<KafkaSink>, retrySpec: Retry<Any> ) : ConfigurationProvider { @@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie params.firstRequestDelay, params.requestInterval, HealthState.INSTANCE, + StreamFromGsonParsers.kafkaSinkParser(), Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval) @@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<Routing> = + override fun invoke(): Flux<Sequence<KafkaSink>> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), firstRequestDelay, requestInterval) @@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .retryWhen(retry) - private fun createCollectorConfiguration(configuration: JsonObject): Routing = - try { - val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject - defineRoute { - fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - - private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString + private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> { + try { + val sinks = DataStreams.namedSinks(configuration) + .filter { it.type() == "kafka" } + return sinks.map(streamParser::unsafeParse).asSequence() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } + } companion object { - private const val ROUTING_CONFIGURATION_KEY = "collector.routing" - private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" - private const val TOPIC_CONFIGURATION_KEY = "toTopic" - private const val MAX_RETRIES = 5L private val logger = Logger(ConfigurationProviderImpl::class) } diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index f830f2c9..e71250ca 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials import reactor.core.publisher.Flux - import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - - val route1 = it.routes[0] - assertThat(FAULT.domainName) - .describedAs("routed domain 1") - .isEqualTo(route1.domain) - assertThat("test-topic-1") - .describedAs("target topic 1") - .isEqualTo(route1.targetTopic) - - val route2 = it.routes[1] - assertThat(HEARTBEAT.domainName) - .describedAs("routed domain 2") - .isEqualTo(route2.domain) - assertThat("test-topic-2") - .describedAs("target topic 2") - .isEqualTo(route2.targetTopic) - + val receivedSink1 = it.elementAt(0) + val receivedSink2 = it.elementAt(1) + + assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) + assertThat(receivedSink1.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") + assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + + assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) + assertThat(receivedSink2.bootstrapServers()) + .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") + assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") }.verifyComplete() } } @@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({ }) +private val aafCredentials1 = ImmutableAafCredentials.builder() + .username("client") + .password("very secure password") + .build() + +private val aafCredentials2 = ImmutableAafCredentials.builder() + .username("other_client") + .password("another very secure password") + .build() private val validConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "fault", - "toTopic": "test-topic-1" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "topic_name": "REG_HVVES_PERF3GPP" + } + }, + "perf3gpp_central": { + "type": "kafka", + "aaf_credentials": { + "username": "other_client", + "password": "another very secure password" }, - { - "fromDomain": "heartbeat", - "toTopic": "test-topic-2" + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", + "topic_name": "CEN_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val invalidConfiguration = JsonParser().parse(""" { - "whatever": "garbage", - "collector.routing": [ - { - "fromDomain": "garbage", - "meaningful": "garbage" + "streams_publishes": { + "perf3gpp_regional": { + "type": "kafka", + "aaf_credentials": { + "username": "client", + "password": "very secure password" + }, + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", + "popic_name": "REG_HVVES_PERF3GPP" } - ] + } + } }""").asJsonObject private val firstRequestDelay = Duration.ofMillis(1) private val requestInterval = Duration.ofMillis(1) +private val streamParser = StreamFromGsonParsers.kafkaSinkParser() private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, healthState: HealthState, @@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, firstRequestDelay, requestInterval, healthState, + streamParser, retry ) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index bd056d4d..a6b32ed9 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicRouting) + val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index ece42285..50fe098c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e84e9486..da9290d3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = +fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } -fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) + configurationProvider.updateConfiguration(kafkaSinks) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 17f6ce32..21c5c189 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -152,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -205,7 +205,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(emptyRouting) + sut.configurationProvider.updateConfiguration(configWithEmptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -242,7 +242,7 @@ object VesHvSpecification : Spek({ .isEqualTo(0) - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) @@ -261,7 +261,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -287,7 +287,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + sut.configurationProvider.updateConfiguration(configWithDifferentRouting) println("config changed") } } @@ -320,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + sut.configurationProvider.updateConfiguration(configWithBasicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 1ad2b0e3..213eff27 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,11 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" - -val basicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val twoDomainsToOneTopicRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } -}.build() - - -val configurationWithDifferentRouting = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } -}.build() - - -val emptyRouting = routing { }.build() +const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" + +val configWithBasicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithTwoDomainsToOneTopicRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(HEARTBEAT.domainName) + .topicName(PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build(), + ImmutableKafkaSink.builder() + .name(MEASUREMENT.domainName) + .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() +) + +val configWithDifferentRouting = sequenceOf( + ImmutableKafkaSink.builder() + .name(PERF3GPP.domainName) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) + .build() + ) + +val configWithEmptyRouting = emptySequence<KafkaSink>() class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() + private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() - fun updateConfiguration(routing: Routing) = + fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(routing) + configStream.onNext(kafkaSinkSequence) } |