diff options
Diffstat (limited to 'sources/hv-collector-core/src')
9 files changed, 80 insertions, 67 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 84310802..782d2324 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.boundary -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.utils.Closeable import reactor.core.publisher.Flux @@ -48,5 +48,5 @@ interface SinkProvider : Closeable { } interface ConfigurationProvider { - operator fun invoke(): Flux<CollectorConfiguration> + operator fun invoke(): Flux<Routing> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3ea14385..c08df748 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -20,13 +20,12 @@ package org.onap.dcae.collectors.veshv.factory import arrow.core.Option -import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState @@ -44,14 +43,14 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, + private val maxPayloadSizeBytes: Int, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { - val config: AtomicReference<CollectorConfiguration> = AtomicReference() + val config = AtomicReference<Routing>() configuration() .doOnNext { logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } @@ -72,12 +71,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), + router = Router(routing, ctx), sink = sinkProvider(ctx), metrics = metrics) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 58a8599a..6c4e4671 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory /** @@ -31,8 +32,17 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory * @since May 2018 */ object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, + + private val sslFactory = SslContextFactory() + + fun createNettyTcpServer(serverConfig: ServerConfiguration, + securityConfig: SecurityConfiguration, collectorProvider: CollectorProvider, - metrics: Metrics): Server = - NettyTcpServer(serverConfiguration, SslContextFactory(), collectorProvider, metrics) + metrics: Metrics + ): Server = NettyTcpServer( + serverConfig, + sslFactory.createServerContext(securityConfig), + collectorProvider, + metrics + ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index a853839a..c362020e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,9 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties @@ -32,15 +32,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(dummyMode: Boolean, - kafkaConfig: KafkaConfiguration): SinkProvider = - if (dummyMode) + fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider = + if (config.dummyMode) LoggingSinkProvider() else - KafkaSinkProvider(kafkaConfig) + KafkaSinkProvider(config) - fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + fun configurationProvider(config: CbsConfiguration): ConfigurationProvider = ConfigurationProviderImpl( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - configurationProviderParams) + config) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 51b6d4f0..754a2efc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -21,11 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.google.gson.JsonObject import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.config.api.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog @@ -49,7 +49,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie retrySpec: Retry<Any> ) : ConfigurationProvider { - constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this( + constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this( cbsClientMono, params.firstRequestDelay, params.requestInterval, @@ -67,7 +67,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) } - override fun invoke(): Flux<CollectorConfiguration> = + override fun invoke(): Flux<Routing> = cbsClientMono .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } @@ -75,7 +75,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } .flatMapMany(::handleUpdates) - private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient + private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient .updates(RequestDiagnosticContext.create(), firstRequestDelay, requestInterval) @@ -85,21 +85,19 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .retryWhen(retry) - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + private fun createCollectorConfiguration(configuration: JsonObject): Routing = try { val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject - defineRoute { - fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() } catch (e: NullPointerException) { throw ParsingException("Failed to parse configuration", e) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index f52890b0..96e45a02 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender @@ -46,7 +46,7 @@ import java.lang.Integer.max internal class KafkaSinkProvider internal constructor( private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { - constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + constructor(config: CollectorConfiguration) : this(constructKafkaSender(config)) override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) @@ -60,14 +60,15 @@ internal class KafkaSinkProvider internal constructor( private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 - private fun constructKafkaSender(config: KafkaConfiguration) = + + private fun constructKafkaSender(config: CollectorConfiguration) = KafkaSender.create(constructSenderOptions(config)) - private fun constructSenderOptions(config: KafkaConfiguration) = + private fun constructSenderOptions(config: CollectorConfiguration) = SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config)) - .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config)) + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers) + .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes)) + .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes)) .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) @@ -75,10 +76,10 @@ internal class KafkaSinkProvider internal constructor( .producerProperty(ACKS_CONFIG, "1") .stopOnError(false) - private fun maxRequestSize(config: KafkaConfiguration) = - (MAXIMUM_REQUEST_SIZE_MULTIPLIER * config.maximalRequestSizeBytes).toInt() + private fun maxRequestSize(maxRequestSizeBytes: Int) = + (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt() - private fun bufferMemory(config: KafkaConfiguration) = - max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * config.maximalRequestSizeBytes) + private fun bufferMemory(maxRequestSizeBytes: Int) = + max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 123956ad..fab96560 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.core.Option import arrow.core.getOrElse import arrow.effects.IO +import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics @@ -30,7 +32,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -41,6 +42,7 @@ import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer import java.net.InetAddress +import java.net.InetSocketAddress import java.time.Duration @@ -48,14 +50,14 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: SslContextFactory, +internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, + private val sslContext: Option<SslContext>, private val collectorProvider: CollectorProvider, private val metrics: Metrics) : Server { override fun start(): IO<ServerHandle> = IO { TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } .configureSsl() .handle(this::handleConnection) .doOnUnbound { @@ -66,11 +68,10 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, } private fun TcpServer.configureSsl() = - sslContextFactory - .createServerContext(serverConfig.securityConfiguration) - .map { sslContext -> + sslContext + .map { serverContext -> logger.info { "Collector configured with SSL enabled" } - this.secure { b -> b.sslContext(sslContext) } + this.secure { it.sslContext(serverContext) } }.getOrElse { logger.info { "Collector configured with SSL disabled" } this @@ -125,7 +126,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector -> withConnectionFrom(nettyInbound) { connection -> connection - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) .logConnectionClosed(clientContext) }.run { collector.handleConnection(nettyInbound.createDataStream()) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index 21aaa129..f830f2c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -78,7 +78,7 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val route1 = it.routing.routes[0] + val route1 = it.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") .isEqualTo(route1.domain) @@ -86,7 +86,7 @@ internal object ConfigurationProviderImplTest : Spek({ .describedAs("target topic 1") .isEqualTo(route1.targetTopic) - val route2 = it.routing.routes[1] + val route2 = it.routes[1] assertThat(HEARTBEAT.domainName) .describedAs("routed domain 2") .isEqualTo(route2.domain) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt index 068476ad..1e3f2e7a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -28,9 +28,10 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.routing import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.ves.VesEventOuterClass import reactor.kafka.sender.KafkaSender @@ -41,8 +42,12 @@ import reactor.kafka.sender.KafkaSender internal object KafkaSinkProviderTest : Spek({ describe("non functional requirements") { given("sample configuration") { - val config = KafkaConfiguration("localhost:9090", - 1024 * 1024) + val config = CollectorConfiguration( + dummyMode = false, + maxRequestSizeBytes = 1024 * 1024, + kafkaServers = "localhost:9090", + routing = routing { }.build()) + val cut = KafkaSinkProvider(config) on("sample clients") { |