From 30afcb56b0c6c4529fdaf68d7b061eee44d68d16 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 13 Mar 2019 18:44:31 +0100 Subject: Remove environment variables and program arguments - Move all command line program arguments to json file. - Reorganize configuration classes and the way they are passed through application - Implement HV VES configuration stream - Create concrete configuration from partial one - Modify main HV-VES server starting pipeline Change-Id: I6cf874b6904ed768e4820b8132f5f760299c929e Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1340 --- .../dcae/collectors/veshv/boundary/adapters.kt | 6 ++-- .../collectors/veshv/factory/CollectorFactory.kt | 15 +++++----- .../dcae/collectors/veshv/factory/ServerFactory.kt | 16 ++++++++-- .../veshv/impl/adapters/AdapterFactory.kt | 15 +++++----- .../impl/adapters/ConfigurationProviderImpl.kt | 34 ++++++++++------------ .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 25 ++++++++-------- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 19 ++++++------ .../impl/adapters/ConfigurationProviderTest.kt | 4 +-- .../impl/adapters/kafka/KafkaSinkProviderTest.kt | 13 ++++++--- 9 files changed, 80 insertions(+), 67 deletions(-) (limited to 'sources/hv-collector-core') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 84310802..782d2324 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux + operator fun invoke(): Flux } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3ea14385..c08df748 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.factory import arrow.core.Option -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, + private val maxPayloadSizeBytes: Int, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config: AtomicReference = AtomicReference() + val config = AtomicReference() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), + router = Router(routing, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 58a8599a..6c4e4671 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory /** @@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + + private val sslFactory = SslContextFactory() + + fun createNettyTcpServer(serverConfig: ServerConfiguration, + securityConfig: SecurityConfiguration, collectorProvider: CollectorProvider, - metrics: Metrics): Server = - NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics) + metrics: Metrics + ): Server = NettyTcpServer( + serverConfig, + sslFactory.createServerContext(securityConfig), + collectorProvider, + metrics + ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index a853839a..c362020e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(dummyMode: Boolean, - kafkaConfig: KafkaConfiguration): SinkProvider = - if (dummyMode) + fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = + if (config.dummyMode) LoggingSinkProvider() else - KafkaSinkProvider(kafkaConfig) + KafkaSinkProvider(config) - fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - configurationProviderParams) + config) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 51b6d4f0..754a2efc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog @@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono ) : ConfigurationProvider { - constructor(cbsClientMono: Mono, params: ConfigurationProviderParams) : this( + constructor(cbsClientMono: Mono, params: CbsConfiguration) : this( cbsClientMono, params.firstRequestDelay, params.requestInterval, @@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = + override fun invoke(): Flux = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux = cbsClient .updates(RequestDiagnosticContext.create(), firstRequestDelay, requestInterval) @@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono) : SinkProvider { - constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) @@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor( private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - private fun constructKafkaSender(config: KafkaConfiguration) = + + private fun constructKafkaSender(config: CollectorConfiguration) = KafkaSender.create(constructSenderOptions(config)) - private fun constructSenderOptions(config: KafkaConfiguration) = + private fun constructSenderOptions(config: CollectorConfiguration) = SenderOptions.create() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config)) + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) + .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) + .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) @@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor( .producerProperty(ACKS_CONFIG, "1") .stopOnError(false) - private fun maxRequestSize(config: KafkaConfiguration) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt() + private fun maxRequestSize(maxRequestSizeBytes: Int) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - private fun bufferMemory(config: KafkaConfiguration) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes) + private fun bufferMemory(maxRequestSizeBytes: Int) = + max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 123956ad..fab96560 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.core.Option import arrow.core.getOrElse import arrow.effects.IO +import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics @@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -41,6 +42,7 @@ import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer import java.net.InetAddress +import java.net.InetSocketAddress import java.time.Duration @@ -48,14 +50,14 @@ import java.time.Duration * @author Piotr Jaszczyk * @since May 2018 */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: SslContextFactory, +internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, + private val sslContext: Option, private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { override fun start(): IO = IO { TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } .configureSsl() .handle(this::handleConnection) .doOnUnbound { @@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun TcpServer.configureSsl() = - sslContextFactory - .createServerContext(serverConfig.securityConfiguration) - .map { sslContext -> + sslContext + .map { serverContext -> logger.info { "Collector configured with SSL enabled" } - this.secure { b -> b.sslContext(sslContext) } + this.secure { it.sslContext(serverContext) } }.getOrElse { logger.info { "Collector configured with SSL disabled" } this @@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, nettyInbound: NettyInbound): (Collector) -> Mono = { collector -> withConnectionFrom(nettyInbound) { connection -> connection - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { collector.handleConnection(nettyInbound.createDataStream()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index 21aaa129..f830f2c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val route1 = it.routing.routes[0] + val route1 = it.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") .isEqualTo(route1.domain) @@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({ .describedAs("target topic 1") .isEqualTo(route1.targetTopic) - val route2 = it.routing.routes[1] + val route2 = it.routes[1] assertThat(HEARTBEAT.domainName) .describedAs("routed domain 2") .isEqualTo(route2.domain) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 068476ad..1e3f2e7a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass import reactor.kafka.sender.KafkaSender @@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender internal object KafkaSinkProviderTest : Spek({ describe("non functional requirements") { given("sample configuration") { - val config = KafkaConfiguration("localhost:9090", - 1024 * 1024) + val config = CollectorConfiguration( + dummyMode = false, + maxRequestSizeBytes = 1024 * 1024, + kafkaServers = "localhost:9090", + routing = routing { }.build()) + val cut = KafkaSinkProvider(config) on("sample clients") { -- cgit 1.2.3-korg