From 6725abbaa6249e107126ffd5ec58f2a96ce60eee Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 29 Mar 2019 11:22:24 +0100 Subject: Move ConfigurationProvider to config module Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/boundary/adapters.kt | 3 - .../org/onap/dcae/collectors/veshv/boundary/api.kt | 3 +- .../collectors/veshv/factory/AdapterFactory.kt | 31 +++ .../collectors/veshv/factory/CollectorFactory.kt | 33 +--- .../veshv/impl/adapters/AdapterFactory.kt | 40 ---- .../impl/adapters/ConfigurationProviderImpl.kt | 110 ----------- .../veshv/impl/adapters/ParsingException.kt | 22 --- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 16 +- .../impl/adapters/ConfigurationProviderImplTest.kt | 207 --------------------- 9 files changed, 44 insertions(+), 421 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt (limited to 'sources/hv-collector-core') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 1b92d90c..e3156a0d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,8 +39,6 @@ interface SinkProvider : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy } -typealias ConfigurationProvider = () -> Flux - interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5c64c70b..ba0a9eee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext @@ -33,7 +32,7 @@ interface Collector { } interface CollectorProvider : Closeable { - operator fun invoke(ctx: ClientContext): Option + operator fun invoke(ctx: ClientContext): Collector } interface Server { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt new file mode 100644 index 00000000..04e575ae --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +object AdapterFactory { + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2b29acd9..1c79abd2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,64 +19,45 @@ */ package org.onap.dcae.collectors.veshv.factory -import arrow.core.Option import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk * @since May 2018 */ -class CollectorFactory(private val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: CollectorConfiguration, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maxPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { + private val maxPayloadSizeBytes: Int) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference() - configuration() - .doOnNext { - logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } - healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - .subscribe(config::set) return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Option = - config.getOption().map { createVesHvCollector(it, ctx) } + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) override fun close() = sinkProvider.close() } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt deleted file mode 100644 index 20b11753..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() - - fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = - ConfigurationProviderImpl( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - config) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt deleted file mode 100644 index 185693c0..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ /dev/null @@ -1,110 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonObject -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal class ConfigurationProviderImpl(private val cbsClientMono: Mono, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - private val streamParser: StreamFromGsonParser, - retrySpec: Retry - -) : ConfigurationProvider { - constructor(cbsClientMono: Mono, params: CbsConfiguration) : this( - cbsClientMono, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - StreamFromGsonParsers.kafkaSinkParser(), - Retry.any() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval) - .jitter(Jitter.random()) - ) - - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { - log("Exception from configuration provider client, retrying subscription", it.exception()) - } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - override fun invoke(): Flux = - cbsClientMono - .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } - .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } - .retryWhen(retry) - .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } - .flatMapMany(::handleUpdates) - - private fun handleUpdates(cbsClient: CbsClient) = cbsClient - .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - firstRequestDelay, - requestInterval) - .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) - .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } - .retryWhen(retry) - - private fun createRoutingDescription(configuration: JsonObject): Routing = try { - DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - - companion object { - private const val MAX_RETRIES = 5L - private val logger = Logger(ConfigurationProviderImpl::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt deleted file mode 100644 index 2b123fc8..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -class ParsingException(message: String, cause: Throwable) : Exception(message, cause) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index fab96560..3e19414d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { metrics.notifyClientConnected() logger.info(clientContext::fullMdc) { "Handling new client connection" } - return collectorProvider(clientContext).fold( - { - logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } - nettyInbound.closeConnectionAndReturn(Mono.empty()) - }, - handleClient(clientContext, nettyInbound) - ) + val collector = collectorProvider(clientContext) + return collector.handleClient(clientContext, nettyInbound) } - private fun handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound): (Collector) -> Mono = { collector -> + private fun Collector.handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound) = withConnectionFrom(nettyInbound) { connection -> connection .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { - collector.handleConnection(nettyInbound.createDataStream()) + handleConnection(nettyInbound.createDataStream()) } - } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt deleted file mode 100644 index 8616ce03..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt +++ /dev/null @@ -1,207 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonParser -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import reactor.core.publisher.Flux - -import reactor.core.publisher.Mono -import reactor.retry.Retry -import reactor.test.StepVerifier -import java.time.Duration - -/** - * @author Jakub Dudycz - * @since May 2018 - */ -internal object ConfigurationProviderImplTest : Spek({ - - describe("Configuration provider") { - - val cbsClient: CbsClient = mock() - val cbsClientMock: Mono = Mono.just(cbsClient) - val healthStateProvider = HealthState.INSTANCE - - given("configuration is never in cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("waiting for configuration") { - val waitTime = Duration.ofMillis(100) - - it("should not get it") { - StepVerifier.create(configProvider().take(1)) - .expectNoEvent(waitTime) - } - } - } - - given("valid configuration from cbs") { - val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(validConfiguration)) - it("should use received configuration") { - - StepVerifier.create(configProvider().take(1)) - .consumeNextWith { - val route1 = it.elementAt(0) - val route2 = it.elementAt(1) - val receivedSink1 = route1.sink - val receivedSink2 = route2.sink - - assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) - assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) - assertThat(receivedSink1.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") - assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") - - assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) - assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) - assertThat(receivedSink2.bootstrapServers()) - .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") - assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") - - }.verifyComplete() - } - } - - } - given("invalid configuration from cbs") { - val iterationCount = 3L - val configProvider = constructConfigurationProvider( - cbsClientMock, healthStateProvider, iterationCount - ) - - on("new configuration") { - whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) - .thenReturn(Flux.just(invalidConfiguration)) - - it("should interrupt the flux") { - StepVerifier.create(configProvider()) - .verifyError() - } - - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - .verifyComplete() - } - } - } - } - -}) - - -val PERF3GPP_REGIONAL = "perf3gpp_regional" -val PERF3GPP_CENTRAL = "perf3gpp_central" - -private val aafCredentials1 = ImmutableAafCredentials.builder() - .username("client") - .password("very secure password") - .build() - -private val aafCredentials2 = ImmutableAafCredentials.builder() - .username("other_client") - .password("another very secure password") - .build() - -private val validConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "$PERF3GPP_REGIONAL": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "topic_name": "REG_HVVES_PERF3GPP" - } - }, - "$PERF3GPP_CENTRAL": { - "type": "kafka", - "aaf_credentials": { - "username": "other_client", - "password": "another very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060", - "topic_name": "CEN_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val invalidConfiguration = JsonParser().parse(""" -{ - "streams_publishes": { - "$PERF3GPP_REGIONAL": { - "type": "kafka", - "aaf_credentials": { - "username": "client", - "password": "very secure password" - }, - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060", - "popic_name": "REG_HVVES_PERF3GPP" - } - } - } -}""").asJsonObject - -private val firstRequestDelay = Duration.ofMillis(1) -private val requestInterval = Duration.ofMillis(1) -private val streamParser = StreamFromGsonParsers.kafkaSinkParser() - -private fun constructConfigurationProvider(cbsClientMono: Mono, - healthState: HealthState, - iterationCount: Long = 1 -): ConfigurationProviderImpl { - - val retry = Retry.onlyIf { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - - return ConfigurationProviderImpl( - cbsClientMono, - firstRequestDelay, - requestInterval, - healthState, - streamParser, - retry - ) -} -- cgit 1.2.3-korg