diff options
Diffstat (limited to 'sources/hv-collector-core')
8 files changed, 31 insertions, 41 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index e3156a0d..48f335a1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -35,7 +35,7 @@ interface Sink : Closeable { fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } -interface SinkProvider : Closeable { +interface SinkFactory : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 0039ef62..4c54d7d2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -30,7 +30,7 @@ interface Collector { fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -interface CollectorProvider : Closeable { +interface CollectorFactory : Closeable { operator fun invoke(ctx: ClientContext): Collector } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt index 04e575ae..70f61b6c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() + fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt index 8fb4e80d..3524f14c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -20,46 +20,36 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(private val configuration: CollectorConfiguration, - private val sinkProvider: SinkProvider, - private val metrics: Metrics, - private val maxPayloadSizeBytes: Int) { +class HvVesCollectorFactory(private val configuration: CollectorConfiguration, + private val sinkFactory: SinkFactory, + private val metrics: Metrics, + private val maxPayloadSizeBytes: Int): CollectorFactory { - fun createVesHvCollectorProvider(): CollectorProvider { + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) - return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Collector = - createVesHvCollector(ctx) - - override fun close() = sinkProvider.close() - } - } + override fun close() = sinkFactory.close() private fun createVesHvCollector(ctx: ClientContext): Collector = HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(configuration.routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkFactory, ctx, metrics), metrics = metrics) - - companion object { - private val logger = Logger(CollectorFactory::class) - } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 6c4e4671..e0f611b6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -37,12 +37,12 @@ object ServerFactory { fun createNettyTcpServer(serverConfig: ServerConfiguration, securityConfig: SecurityConfiguration, - collectorProvider: CollectorProvider, + collectorFactory: CollectorFactory, metrics: Metrics ): Server = NettyTcpServer( serverConfig, sslFactory.createServerContext(securityConfig), - collectorProvider, + collectorFactory, metrics ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index b03b89e1..fe34a9c7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -23,7 +23,7 @@ import arrow.core.None import arrow.core.toOption import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing, private val ctx: ClientContext, private val metrics: Metrics) { constructor(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext, metrics: Metrics) : this(routing, - constructMessageSinks(routing, sinkProvider, ctx), + constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) { logger.debug(ctx::mdc) { "Routing for client: $routing" } @@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing, private val NONE_PARTITION = None internal fun constructMessageSinks(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext) = routing.map(Route::sink) .distinctBy { it.topicName() } - .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) + .associateBy({ it.topicName() }, { sinkFactory(it, ctx) }) } private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt index 86980832..9df1af31 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext @@ -37,7 +37,7 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider : SinkProvider { +internal class KafkaSinkFactory : SinkFactory { private val messageSinks = synchronizedMap( mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>() ) @@ -58,6 +58,6 @@ internal class KafkaSinkProvider : SinkProvider { } companion object { - private val logger = Logger(KafkaSinkProvider::class) + private val logger = Logger(KafkaSinkFactory::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a208384a..7ce86f98 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,7 +23,7 @@ import arrow.core.Option import arrow.core.getOrElse import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -51,7 +51,7 @@ import java.time.Duration */ internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, private val sslContext: Option<SslContext>, - private val collectorProvider: CollectorProvider, + private val collectorFactory: CollectorFactory, private val metrics: Metrics) : Server { override fun start(): Mono<ServerHandle> = @@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati } private fun closeAction(): Mono<Void> = - collectorProvider.close().doOnSuccess { + collectorFactory.close().doOnSuccess { logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } } @@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } - val collector = collectorProvider(clientContext) + val collector = collectorFactory(clientContext) return collector.handleClient(clientContext, nettyInbound) } |