diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-29 11:22:24 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-01 12:32:42 +0200 |
commit | 6725abbaa6249e107126ffd5ec58f2a96ce60eee (patch) | |
tree | f3fa6d11a04b60a631ee4160a69744b44e08e1ed /sources/hv-collector-core/src/main/kotlin/org | |
parent | 4281a12d8e892f46f5f2226ee0f8aee8b862b177 (diff) |
Move ConfigurationProvider to config module
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d
Issue-ID: DCAEGEN2-1347
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt | 3 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt | 3 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt) | 15 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt | 33 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt | 40 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt | 110 | ||||
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt | 16 |
7 files changed, 25 insertions, 195 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 1b92d90c..e3156a0d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,8 +39,6 @@ interface SinkProvider : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> } -typealias ConfigurationProvider = () -> Flux<Routing> - interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 5c64c70b..ba0a9eee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.boundary -import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.ClientContext @@ -33,7 +32,7 @@ interface Collector { } interface CollectorProvider : Closeable { - operator fun invoke(ctx: ClientContext): Option<Collector> + operator fun invoke(ctx: ClientContext): Collector } interface Server { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt index 2b123fc8..04e575ae 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,15 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.factory -class ParsingException(message: String, cause: Throwable) : Exception(message, cause) +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object AdapterFactory { + fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2b29acd9..1c79abd2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,64 +19,45 @@ */ package org.onap.dcae.collectors.veshv.factory -import arrow.core.Option import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(private val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: CollectorConfiguration, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maxPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { + private val maxPayloadSizeBytes: Int) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Routing>() - configuration() - .doOnNext { - logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } - healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - .subscribe(config::set) return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Option<Collector> = - config.getOption().map { createVesHvCollector(it, ctx) } + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) override fun close() = sinkProvider.close() } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt deleted file mode 100644 index 20b11753..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() - - fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = - ConfigurationProviderImpl( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - config) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt deleted file mode 100644 index 185693c0..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ /dev/null @@ -1,110 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.google.gson.JsonObject -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog -import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.time.Duration - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - private val streamParser: StreamFromGsonParser<KafkaSink>, - retrySpec: Retry<Any> - -) : ConfigurationProvider { - constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this( - cbsClientMono, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - StreamFromGsonParsers.kafkaSinkParser(), - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval) - .jitter(Jitter.random()) - ) - - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { - log("Exception from configuration provider client, retrying subscription", it.exception()) - } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - override fun invoke(): Flux<Routing> = - cbsClientMono - .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } - .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } - .retryWhen(retry) - .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } - .flatMapMany(::handleUpdates) - - private fun handleUpdates(cbsClient: CbsClient) = cbsClient - .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()), - firstRequestDelay, - requestInterval) - .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } - .map(::createRoutingDescription) - .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } - .retryWhen(retry) - - private fun createRoutingDescription(configuration: JsonObject): Routing = try { - DataStreams.namedSinks(configuration) - .filter(streamOfType(KAFKA)) - .map(streamParser::unsafeParse) - .map { Route(it.name(), it) } - .asIterable() - .toList() - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - - companion object { - private const val MAX_RETRIES = 5L - private val logger = Logger(ConfigurationProviderImpl::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index fab96560..3e19414d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() logger.info(clientContext::fullMdc) { "Handling new client connection" } - return collectorProvider(clientContext).fold( - { - logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } - nettyInbound.closeConnectionAndReturn(Mono.empty<Void>()) - }, - handleClient(clientContext, nettyInbound) - ) + val collector = collectorProvider(clientContext) + return collector.handleClient(clientContext, nettyInbound) } - private fun handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> + private fun Collector.handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound) = withConnectionFrom(nettyInbound) { connection -> connection .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { - collector.handleConnection(nettyInbound.createDataStream()) + handleConnection(nettyInbound.createDataStream()) } - } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { |