aboutsummaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-configuration/pom.xml5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt32
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt)8
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt)2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt)0
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt)85
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt)2
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt)45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt)11
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt16
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt7
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt29
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt155
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt37
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt46
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt18
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt8
23 files changed, 159 insertions, 395 deletions
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml
index 792b9eaa..b6ec4ca2 100644
--- a/sources/hv-collector-configuration/pom.xml
+++ b/sources/hv-collector-configuration/pom.xml
@@ -77,7 +77,10 @@
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>cbs-client</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index dd1b171e..9684484b 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
class ConfigurationModule {
@@ -34,16 +40,28 @@ class ConfigurationModule {
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
- private lateinit var initialConfig: HvVesConfiguration
-
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
- fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+ fun hvVesConfigurationUpdates(args: Array<String>,
+ configStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
Flux.just(cmd.getConfigurationFile(args))
.throwOnLeft { MissingArgumentException(it.message, it.cause) }
.map { it.reader().use(configReader::loadConfig) }
- .map { configValidator.validate(it) }
- .throwOnLeft { ValidationException(it.message) }
- .doOnNext { initialConfig = it }
+ .cache()
+ .flatMap { basePartialConfig ->
+ val baseConfig = configValidator.validate(basePartialConfig)
+ .rightOrThrow { ValidationException(it.message) }
+ val cbsConfigProvider = CbsConfigurationProvider(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ baseConfig.cbs,
+ configStateListener,
+ mdc)
+ val merger = ConfigurationMerger()
+ cbsConfigProvider()
+ .map { merger.merge(basePartialConfig, it) }
+ .map { configValidator.validate(it) }
+ .throwOnLeft { ValidationException(it.message) }
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
index 2b123fc8..9fa6660e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.api
-class ParsingException(message: String, cause: Throwable) : Exception(message, cause)
+interface ConfigurationStateListener {
+ fun retrying() {}
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index 3375821e..c1807be2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -47,7 +47,5 @@ data class CbsConfiguration(
)
data class CollectorConfiguration(
- val maxRequestSizeBytes: Int,
- val kafkaServers: String,
val routing: Routing
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
index 2fc29829..2fc29829 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
index e5a83ac4..e5a83ac4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
index 185693c0..2038c31a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
@@ -17,17 +17,18 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.impl
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
import com.google.gson.JsonObject
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
@@ -42,69 +43,77 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
-import java.time.Duration
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
- retrySpec: Retry<Any>
+internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
+ private val cbsConfiguration: CbsConfiguration,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
+ private val configurationStateListener: ConfigurationStateListener,
+ retrySpec: Retry<Any>,
+ private val mdc: MappedDiagnosticContext
-) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
- cbsClientMono,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- StreamFromGsonParsers.kafkaSinkParser(),
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval)
- .jitter(Jitter.random())
- )
+) {
+ constructor(cbsClientMono: Mono<CbsClient>,
+ cbsConfig: CbsConfiguration,
+ configurationStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext) :
+ this(
+ cbsClientMono,
+ cbsConfig,
+ StreamFromGsonParsers.kafkaSinkParser(),
+ configurationStateListener,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(cbsConfig.requestInterval)
+ .jitter(Jitter.random()),
+ mdc
+ )
private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) {
+ logger.withWarn(mdc) {
log("Exception from configuration provider client, retrying subscription", it.exception())
}
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ configurationStateListener.retrying()
}
- override fun invoke(): Flux<Routing> =
+ operator fun invoke(): Flux<PartialConfiguration> =
cbsClientMono
- .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+ .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
.retryWhen(retry)
- .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+ .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
private fun handleUpdates(cbsClient: CbsClient) = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- firstRequestDelay,
- requestInterval)
- .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+ cbsConfiguration.firstRequestDelay,
+ cbsConfiguration.requestInterval)
+ .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
.map(::createRoutingDescription)
- .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+ .onErrorLog(logger, mdc) { "Error while creating configuration" }
.retryWhen(retry)
+ .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) }
- private fun createRoutingDescription(configuration: JsonObject): Routing = try {
- DataStreams.namedSinks(configuration)
+ private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
+ val routes = DataStreams.namedSinks(configuration)
.filter(streamOfType(KAFKA))
.map(streamParser::unsafeParse)
.map { Route(it.name(), it) }
.asIterable()
.toList()
+ Some(routes)
} catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
+ logger.withWarn(mdc) {
+ log("Invalid streams configuration", e)
+ }
+ None
}
companion object {
private const val MAX_RETRIES = 5L
- private val logger = Logger(ConfigurationProviderImpl::class)
+ private val logger = Logger(CbsConfigurationProvider::class)
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 04bba7e2..3e599b58 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -63,9 +63,7 @@ internal class ConfigurationValidator {
securityConfiguration,
// TOD0: swap when ConfigurationMerger is implemented
// collectorConfiguration
- CollectorConfiguration(-1,
- "I do not exist. I'm not even a URL :o",
- emptyList()),
+ CollectorConfiguration(emptyList()),
// end TOD0
logLevel
)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index 3e93a400..c1a98294 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -19,7 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.*
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.DefaultParser
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index a27998e1..f3c149cd 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -54,6 +54,6 @@ internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
val maxRequestSizeBytes: Option<Int> = None,
- val kafkaServers: Option<List<InetSocketAddress>> = None,
+ val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
val routing: Option<Routing> = None
)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index 8616ce03..0cbc0e4a 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -17,12 +17,14 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.impl
import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
@@ -30,13 +32,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import reactor.core.publisher.Flux
-
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -46,16 +47,16 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal object ConfigurationProviderImplTest : Spek({
+internal object CbsConfigurationProviderTest : Spek({
describe("Configuration provider") {
val cbsClient: CbsClient = mock()
val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
- val healthStateProvider = HealthState.INSTANCE
+ val configStateListener: ConfigurationStateListener = mock()
given("configuration is never in cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+ val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
on("waiting for configuration") {
val waitTime = Duration.ofMillis(100)
@@ -68,7 +69,7 @@ internal object ConfigurationProviderImplTest : Spek({
}
given("valid configuration from cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+ val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
on("new configuration") {
whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
@@ -77,8 +78,9 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val route1 = it.elementAt(0)
- val route2 = it.elementAt(1)
+ val routes = it.collector.orNull()!!.routing.orNull()!!
+ val route1 = routes.elementAt(0)
+ val route2 = routes.elementAt(1)
val receivedSink1 = route1.sink
val receivedSink2 = route2.sink
@@ -102,7 +104,7 @@ internal object ConfigurationProviderImplTest : Spek({
given("invalid configuration from cbs") {
val iterationCount = 3L
val configProvider = constructConfigurationProvider(
- cbsClientMock, healthStateProvider, iterationCount
+ cbsClientMock, configStateListener, iterationCount
)
on("new configuration") {
@@ -114,11 +116,8 @@ internal object ConfigurationProviderImplTest : Spek({
.verifyError()
}
- it("should update the health state") {
- StepVerifier.create(healthStateProvider().take(iterationCount))
- .expectNextCount(iterationCount - 1)
- .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- .verifyComplete()
+ it("should call state listener when retrying") {
+ verify(configStateListener, times(iterationCount.toInt())).retrying()
}
}
}
@@ -190,18 +189,18 @@ private val requestInterval = Duration.ofMillis(1)
private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
- healthState: HealthState,
+ configurationStateListener: ConfigurationStateListener,
iterationCount: Long = 1
-): ConfigurationProviderImpl {
+): CbsConfigurationProvider {
val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
- return ConfigurationProviderImpl(
+ return CbsConfigurationProvider(
cbsClientMono,
- firstRequestDelay,
- requestInterval,
- healthState,
+ CbsConfiguration(firstRequestDelay, requestInterval),
streamParser,
- retry
+ configurationStateListener,
+ retry,
+ { mapOf("k" to "v") }
)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 1b92d90c..e3156a0d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,8 +39,6 @@ interface SinkProvider : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
-typealias ConfigurationProvider = () -> Flux<Routing>
-
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 5c64c70b..ba0a9eee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -33,7 +32,7 @@ interface Collector {
}
interface CollectorProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Option<Collector>
+ operator fun invoke(ctx: ClientContext): Collector
}
interface Server {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
index 20b11753..04e575ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
@@ -17,14 +17,10 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -32,9 +28,4 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
*/
object AdapterFactory {
fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
-
- fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
- ConfigurationProviderImpl(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- config)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2b29acd9..1c79abd2 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -19,64 +19,45 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import arrow.core.Option
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: CollectorConfiguration,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int,
- private val healthState: HealthState = HealthState.INSTANCE) {
+ private val maxPayloadSizeBytes: Int) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Routing>()
- configuration()
- .doOnNext {
- logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
- healthState.changeState(HealthDescription.HEALTHY)
- }
- .doOnError {
- logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
- logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
- healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- .subscribe(config::set)
return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
override fun close() = sinkProvider.close()
}
}
- private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index fab96560..3e19414d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc) { "Handling new client connection" }
- return collectorProvider(clientContext).fold(
- {
- logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
- nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
- },
- handleClient(clientContext, nettyInbound)
- )
+ val collector = collectorProvider(clientContext)
+ return collector.handleClient(clientContext, nettyInbound)
}
- private fun handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ private fun Collector.handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound) =
withConnectionFrom(nettyInbound) { connection ->
connection
.configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
.logConnectionClosed(clientContext)
}.run {
- collector.handleConnection(nettyInbound.createDataStream())
+ handleConnection(nettyInbound.createDataStream())
}
- }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 61a9a356..35dfba8b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
@@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({
describe("VES High Volume Collector performance") {
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(basicRouting), sink)
val numMessages: Long = 300_000
val runs = 4
@@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(basicRouting), sink)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index ec540606..1217c471 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
-import arrow.core.getOrElse
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
@@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
@@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
@@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) : Closeable {
- val configurationProvider = FakeConfigurationProvider()
- val healthStateProvider = FakeHealthState()
+class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
- configurationProvider,
+ configuration,
sinkProvider,
metrics,
- MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider
+ MAX_PAYLOAD_SIZE_BYTES
)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider(ClientContext(alloc)).getOrElse {
- throw IllegalStateException("Collector not available.")
- }
+ get() = collectorProvider(ClientContext(alloc))
fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
@@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
private val timeout = Duration.ofSeconds(10)
fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
- Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
- Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), AlwaysFailingSink())
fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
- Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), DelayingSink(delay))
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 5d215fc5..6a718eea 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -166,9 +168,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val (sut, sink) = vesHvWithStoringSink()
-
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+ val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -202,150 +202,6 @@ object VesHvSpecification : Spek({
}
}
- describe("configuration update") {
-
- val defaultTimeout = Duration.ofSeconds(10)
-
- given("successful configuration change") {
-
- lateinit var sut: Sut
- lateinit var sink: StoringSink
-
- beforeEachTest {
- vesHvWithStoringSink().run {
- sut = first
- sink = second
- }
- }
-
- it("should update collector") {
- val firstCollector = sut.collector
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- val collectorAfterUpdate = sut.collector
-
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
- }
-
- it("should start routing messages") {
-
- sut.configurationProvider.updateConfiguration(emptyRouting)
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).isEmpty()
-
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
-
- assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should change domain routing") {
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
-
- assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
-
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
-
- assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should update routing for each client sending one message") {
-
- val messagesAmount = 10
- val messagesForEachTopic = 5
-
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- }.then().block(defaultTimeout)
-
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
-
- it("should not update routing for client sending continuous stream of messages") {
-
- val messageStreamSize = 10
- val pivot = 5
-
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(PERF3GPP) }
-
-
- sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
-
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
- }
-
- it("should mark the application healthy") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.HEALTHY)
- }
- }
-
- given("failed configuration change") {
- val (sut, _) = vesHvWithStoringSink()
- sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- it("should mark the application unhealthy ") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- }
- }
-
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
val (sut, sink) = vesHvWithStoringSink()
@@ -362,9 +218,8 @@ object VesHvSpecification : Spek({
})
-private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(routing), sink)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
deleted file mode 100644
index c25771b7..00000000
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import reactor.core.publisher.Flux
-
-class FakeHealthState : HealthState {
-
- lateinit var currentHealth: HealthDescription
-
- override fun changeState(healthDescription: HealthDescription) {
- currentHealth = healthDescription
- }
-
- override fun invoke(): Flux<HealthDescription> {
- throw NotImplementedError()
- }
-}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
deleted file mode 100644
index c465fd91..00000000
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import reactor.core.publisher.FluxProcessor
-import reactor.core.publisher.UnicastProcessor
-import reactor.retry.RetryExhaustedException
-
-
-class FakeConfigurationProvider : ConfigurationProvider {
- private var shouldThrowException = false
- private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
-
- fun updateConfiguration(routing: Routing) =
- if (shouldThrowException) {
- configStream.onError(RetryExhaustedException("I'm so tired"))
- } else {
- configStream.onNext(routing)
- }
-
-
- fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
- this.shouldThrowException = shouldThrowException
- }
-
- override fun invoke() = configStream
-}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 059e8028..22d8000e 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.main
import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -41,10 +42,25 @@ private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
fun main(args: Array<String>) {
+ val configStateListener = object : ConfigurationStateListener {
+ override fun retrying() {
+ HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ }
+ }
+
HealthCheckServer.start(configurationModule.healthCheckPort(args))
configurationModule
- .hvVesConfigurationUpdates(args)
+ .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
.publishOn(Schedulers.single(Schedulers.elastic()))
+ .doOnNext {
+ logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
+ HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
+ }
+ .doOnError {
+ logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+ logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
+ HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
+ }
.doOnNext(::startServer)
.doOnError(::logServerStartFailed)
.neverComplete() // TODO: remove after merging configuration stream with cbs
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index aed4d928..c079cc59 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,8 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.factory.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -59,11 +58,10 @@ object VesServer {
private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
CollectorFactory(
- AdapterFactory.configurationProvider(config.cbs),
+ config.collector,
AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes,
- HealthState.INSTANCE
+ config.server.maxPayloadSizeBytes
)
private fun logServerStarted(handle: ServerHandle) =