aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/pom.xml4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt43
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt94
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt14
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt16
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt85
11 files changed, 176 insertions, 140 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 823f671a..e7134e18 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -119,6 +119,10 @@
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 782d2324..f475a0eb 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -27,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.Flux
interface Sink {
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
}
interface ConfigurationProvider {
- operator fun invoke(): Flux<Routing>
+ operator fun invoke(): Flux<Sequence<KafkaSink>>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index c08df748..c674ef36 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -37,6 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import java.util.concurrent.atomic.AtomicReference
/**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Routing>()
+ val config = AtomicReference<Sequence<KafkaSink>>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,12 +71,12 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(routing, ctx),
+ router = Router(kafkaSinks, ctx),
sink = sinkProvider(ctx),
metrics = metrics)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index bd92c6d3..723ba39a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
class Router(private val routing: Routing, private val ctx: ClientContext) {
+
+ constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
+ routing {
+ kafkaSinks.forEach {
+ defineRoute {
+ fromDomain(it.name())
+ toTopic(it.topicName())
+ withFixedPartitioning()
+ }
+ }
+ }.build(),
+ ctx
+ )
+
fun findDestination(message: VesMessage): Option<RoutedMessage> =
routing.routeFor(message.header).map { it(message) }.also {
if (it.isEmpty()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index f96350ac..5b0dca2d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
private val healthState: HealthState,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
params.firstRequestDelay,
params.requestInterval,
HealthState.INSTANCE,
+ StreamFromGsonParsers.kafkaSinkParser(),
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval)
@@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Routing> =
+ override fun invoke(): Flux<Sequence<KafkaSink>> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
@@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): Routing =
- try {
- val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
- private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+ private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
+ try {
+ val sinks = DataStreams.namedSinks(configuration)
+ .filter { it.type() == "kafka" }
+ return sinks.map(streamParser::unsafeParse).asSequence()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+ }
companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
private const val MAX_RETRIES = 5L
private val logger = Logger(ConfigurationProviderImpl::class)
}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index f830f2c9..e71250ca 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
import reactor.core.publisher.Flux
-
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
-
- val route1 = it.routes[0]
- assertThat(FAULT.domainName)
- .describedAs("routed domain 1")
- .isEqualTo(route1.domain)
- assertThat("test-topic-1")
- .describedAs("target topic 1")
- .isEqualTo(route1.targetTopic)
-
- val route2 = it.routes[1]
- assertThat(HEARTBEAT.domainName)
- .describedAs("routed domain 2")
- .isEqualTo(route2.domain)
- assertThat("test-topic-2")
- .describedAs("target topic 2")
- .isEqualTo(route2.targetTopic)
-
+ val receivedSink1 = it.elementAt(0)
+ val receivedSink2 = it.elementAt(1)
+
+ assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(receivedSink1.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+ assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+ assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(receivedSink2.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+ assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
}.verifyComplete()
}
}
@@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({
})
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+ .username("client")
+ .password("very secure password")
+ .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+ .username("other_client")
+ .password("another very secure password")
+ .build()
private val validConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "fault",
- "toTopic": "test-topic-1"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "topic_name": "REG_HVVES_PERF3GPP"
+ }
+ },
+ "perf3gpp_central": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "other_client",
+ "password": "another very secure password"
},
- {
- "fromDomain": "heartbeat",
- "toTopic": "test-topic-2"
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+ "topic_name": "CEN_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val invalidConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "garbage",
- "meaningful": "garbage"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "popic_name": "REG_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val firstRequestDelay = Duration.ofMillis(1)
private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
healthState: HealthState,
@@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
firstRequestDelay,
requestInterval,
healthState,
+ streamParser,
retry
)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index bd056d4d..a6b32ed9 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(basicRouting)
+ val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index ece42285..50fe098c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e84e9486..da9290d3 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
-fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
-fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 17f6ce32..21c5c189 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -152,7 +152,7 @@ object VesHvSpecification : Spek({
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -205,7 +205,7 @@ object VesHvSpecification : Spek({
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,12 +213,12 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(emptyRouting)
+ sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
@@ -242,7 +242,7 @@ object VesHvSpecification : Spek({
.isEqualTo(0)
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
@@ -261,7 +261,7 @@ object VesHvSpecification : Spek({
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
println("config changed")
}
}
@@ -320,7 +320,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +349,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 1ad2b0e3..213eff27 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
@@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-
-val basicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val twoDomainsToOneTopicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val configurationWithDifferentRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val emptyRouting = routing { }.build()
+const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
+
+val configWithBasicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithTwoDomainsToOneTopicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(HEARTBEAT.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(MEASUREMENT.domainName)
+ .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithDifferentRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+ )
+
+val configWithEmptyRouting = emptySequence<KafkaSink>()
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
- fun updateConfiguration(routing: Routing) =
+ fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(routing)
+ configStream.onNext(kafkaSinkSequence)
}