aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-29 11:22:24 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-01 12:32:42 +0200
commit6725abbaa6249e107126ffd5ec58f2a96ce60eee (patch)
treef3fa6d11a04b60a631ee4160a69744b44e08e1ed /sources/hv-collector-core
parent4281a12d8e892f46f5f2226ee0f8aee8b862b177 (diff)
Move ConfigurationProvider to config module
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt)15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt110
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt16
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt207
8 files changed, 25 insertions, 402 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 1b92d90c..e3156a0d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,8 +39,6 @@ interface SinkProvider : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
-typealias ConfigurationProvider = () -> Flux<Routing>
-
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 5c64c70b..ba0a9eee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -33,7 +32,7 @@ interface Collector {
}
interface CollectorProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Option<Collector>
+ operator fun invoke(ctx: ClientContext): Collector
}
interface Server {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
index 2b123fc8..04e575ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,15 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.factory
-class ParsingException(message: String, cause: Throwable) : Exception(message, cause)
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2b29acd9..1c79abd2 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -19,64 +19,45 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import arrow.core.Option
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: CollectorConfiguration,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int,
- private val healthState: HealthState = HealthState.INSTANCE) {
+ private val maxPayloadSizeBytes: Int) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Routing>()
- configuration()
- .doOnNext {
- logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
- healthState.changeState(HealthDescription.HEALTHY)
- }
- .doOnError {
- logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
- logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
- healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- .subscribe(config::set)
return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
override fun close() = sinkProvider.close()
}
}
- private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
deleted file mode 100644
index 20b11753..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object AdapterFactory {
- fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
-
- fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
- ConfigurationProviderImpl(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- config)
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
deleted file mode 100644
index 185693c0..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import com.google.gson.JsonObject
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.time.Duration
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
- cbsClientMono,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- StreamFromGsonParsers.kafkaSinkParser(),
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval)
- .jitter(Jitter.random())
- )
-
- private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) {
- log("Exception from configuration provider client, retrying subscription", it.exception())
- }
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- }
-
- override fun invoke(): Flux<Routing> =
- cbsClientMono
- .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
- .retryWhen(retry)
- .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
- .flatMapMany(::handleUpdates)
-
- private fun handleUpdates(cbsClient: CbsClient) = cbsClient
- .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- firstRequestDelay,
- requestInterval)
- .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
- .map(::createRoutingDescription)
- .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
- .retryWhen(retry)
-
- private fun createRoutingDescription(configuration: JsonObject): Routing = try {
- DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
- .map(streamParser::unsafeParse)
- .map { Route(it.name(), it) }
- .asIterable()
- .toList()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
- companion object {
- private const val MAX_RETRIES = 5L
- private val logger = Logger(ConfigurationProviderImpl::class)
- }
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index fab96560..3e19414d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc) { "Handling new client connection" }
- return collectorProvider(clientContext).fold(
- {
- logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
- nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
- },
- handleClient(clientContext, nettyInbound)
- )
+ val collector = collectorProvider(clientContext)
+ return collector.handleClient(clientContext, nettyInbound)
}
- private fun handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ private fun Collector.handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound) =
withConnectionFrom(nettyInbound) { connection ->
connection
.configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
.logConnectionClosed(clientContext)
}.run {
- collector.handleConnection(nettyInbound.createDataStream())
+ handleConnection(nettyInbound.createDataStream())
}
- }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
deleted file mode 100644
index 8616ce03..00000000
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import com.google.gson.JsonParser
-import com.nhaarman.mockitokotlin2.any
-import com.nhaarman.mockitokotlin2.eq
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import reactor.core.publisher.Flux
-
-import reactor.core.publisher.Mono
-import reactor.retry.Retry
-import reactor.test.StepVerifier
-import java.time.Duration
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal object ConfigurationProviderImplTest : Spek({
-
- describe("Configuration provider") {
-
- val cbsClient: CbsClient = mock()
- val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
- val healthStateProvider = HealthState.INSTANCE
-
- given("configuration is never in cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
-
- on("waiting for configuration") {
- val waitTime = Duration.ofMillis(100)
-
- it("should not get it") {
- StepVerifier.create(configProvider().take(1))
- .expectNoEvent(waitTime)
- }
- }
- }
-
- given("valid configuration from cbs") {
- val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
-
- on("new configuration") {
- whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
- .thenReturn(Flux.just(validConfiguration))
- it("should use received configuration") {
-
- StepVerifier.create(configProvider().take(1))
- .consumeNextWith {
- val route1 = it.elementAt(0)
- val route2 = it.elementAt(1)
- val receivedSink1 = route1.sink
- val receivedSink2 = route2.sink
-
- assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
- assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
- assertThat(receivedSink1.bootstrapServers())
- .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
- assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
-
- assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
- assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
- assertThat(receivedSink2.bootstrapServers())
- .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
- assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
-
- }.verifyComplete()
- }
- }
-
- }
- given("invalid configuration from cbs") {
- val iterationCount = 3L
- val configProvider = constructConfigurationProvider(
- cbsClientMock, healthStateProvider, iterationCount
- )
-
- on("new configuration") {
- whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
- .thenReturn(Flux.just(invalidConfiguration))
-
- it("should interrupt the flux") {
- StepVerifier.create(configProvider())
- .verifyError()
- }
-
- it("should update the health state") {
- StepVerifier.create(healthStateProvider().take(iterationCount))
- .expectNextCount(iterationCount - 1)
- .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- .verifyComplete()
- }
- }
- }
- }
-
-})
-
-
-val PERF3GPP_REGIONAL = "perf3gpp_regional"
-val PERF3GPP_CENTRAL = "perf3gpp_central"
-
-private val aafCredentials1 = ImmutableAafCredentials.builder()
- .username("client")
- .password("very secure password")
- .build()
-
-private val aafCredentials2 = ImmutableAafCredentials.builder()
- .username("other_client")
- .password("another very secure password")
- .build()
-
-private val validConfiguration = JsonParser().parse("""
-{
- "streams_publishes": {
- "$PERF3GPP_REGIONAL": {
- "type": "kafka",
- "aaf_credentials": {
- "username": "client",
- "password": "very secure password"
- },
- "kafka_info": {
- "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
- "topic_name": "REG_HVVES_PERF3GPP"
- }
- },
- "$PERF3GPP_CENTRAL": {
- "type": "kafka",
- "aaf_credentials": {
- "username": "other_client",
- "password": "another very secure password"
- },
- "kafka_info": {
- "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
- "topic_name": "CEN_HVVES_PERF3GPP"
- }
- }
- }
-}""").asJsonObject
-
-private val invalidConfiguration = JsonParser().parse("""
-{
- "streams_publishes": {
- "$PERF3GPP_REGIONAL": {
- "type": "kafka",
- "aaf_credentials": {
- "username": "client",
- "password": "very secure password"
- },
- "kafka_info": {
- "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
- "popic_name": "REG_HVVES_PERF3GPP"
- }
- }
- }
-}""").asJsonObject
-
-private val firstRequestDelay = Duration.ofMillis(1)
-private val requestInterval = Duration.ofMillis(1)
-private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
-
-private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
- healthState: HealthState,
- iterationCount: Long = 1
-): ConfigurationProviderImpl {
-
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
-
- return ConfigurationProviderImpl(
- cbsClientMono,
- firstRequestDelay,
- requestInterval,
- healthState,
- streamParser,
- retry
- )
-}