diff options
10 files changed, 40 insertions, 55 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index e3156a0d..48f335a1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -35,7 +35,7 @@ interface Sink : Closeable { fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } -interface SinkProvider : Closeable { +interface SinkFactory : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink> } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 0039ef62..4c54d7d2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -30,7 +30,7 @@ interface Collector { fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> } -interface CollectorProvider : Closeable { +interface CollectorFactory : Closeable { operator fun invoke(ctx: ClientContext): Collector } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt index 04e575ae..70f61b6c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() + fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt index 8fb4e80d..3524f14c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -20,46 +20,36 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(private val configuration: CollectorConfiguration, - private val sinkProvider: SinkProvider, - private val metrics: Metrics, - private val maxPayloadSizeBytes: Int) { +class HvVesCollectorFactory(private val configuration: CollectorConfiguration, + private val sinkFactory: SinkFactory, + private val metrics: Metrics, + private val maxPayloadSizeBytes: Int): CollectorFactory { - fun createVesHvCollectorProvider(): CollectorProvider { + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) - return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Collector = - createVesHvCollector(ctx) - - override fun close() = sinkProvider.close() - } - } + override fun close() = sinkFactory.close() private fun createVesHvCollector(ctx: ClientContext): Collector = HvVesCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(configuration.routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkFactory, ctx, metrics), metrics = metrics) - - companion object { - private val logger = Logger(CollectorFactory::class) - } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 6c4e4671..e0f611b6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -37,12 +37,12 @@ object ServerFactory { fun createNettyTcpServer(serverConfig: ServerConfiguration, securityConfig: SecurityConfiguration, - collectorProvider: CollectorProvider, + collectorFactory: CollectorFactory, metrics: Metrics ): Server = NettyTcpServer( serverConfig, sslFactory.createServerContext(securityConfig), - collectorProvider, + collectorFactory, metrics ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index b03b89e1..fe34a9c7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -23,7 +23,7 @@ import arrow.core.None import arrow.core.toOption import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing, private val ctx: ClientContext, private val metrics: Metrics) { constructor(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext, metrics: Metrics) : this(routing, - constructMessageSinks(routing, sinkProvider, ctx), + constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) { logger.debug(ctx::mdc) { "Routing for client: $routing" } @@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing, private val NONE_PARTITION = None internal fun constructMessageSinks(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext) = routing.map(Route::sink) .distinctBy { it.topicName() } - .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) + .associateBy({ it.topicName() }, { sinkFactory(it, ctx) }) } private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt index 86980832..9df1af31 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender import org.onap.dcae.collectors.veshv.model.ClientContext @@ -37,7 +37,7 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider : SinkProvider { +internal class KafkaSinkFactory : SinkFactory { private val messageSinks = synchronizedMap( mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>() ) @@ -58,6 +58,6 @@ internal class KafkaSinkProvider : SinkProvider { } companion object { - private val logger = Logger(KafkaSinkProvider::class) + private val logger = Logger(KafkaSinkFactory::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a208384a..7ce86f98 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,7 +23,7 @@ import arrow.core.Option import arrow.core.getOrElse import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -51,7 +51,7 @@ import java.time.Duration */ internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, private val sslContext: Option<SslContext>, - private val collectorProvider: CollectorProvider, + private val collectorFactory: CollectorFactory, private val metrics: Metrics) : Server { override fun start(): Mono<ServerHandle> = @@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati } private fun closeAction(): Mono<Void> = - collectorProvider.close().doOnSuccess { + collectorFactory.close().doOnSuccess { logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } } @@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> { metrics.notifyClientConnected() logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } - val collector = collectorProvider(clientContext) + val collector = collectorFactory(clientContext) return collector.handleClient(clientContext, nettyInbound) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 95b9159e..8b2bc13c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,17 +19,16 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink @@ -51,17 +50,15 @@ import java.util.concurrent.atomic.AtomicBoolean class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable { val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() - val sinkProvider = DummySinkProvider(sink) + val sinkProvider = DummySinkFactory(sink) - private val collectorFactory = CollectorFactory( + private val collectorProvider = HvVesCollectorFactory( configuration, sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES ) - private val collectorProvider = collectorFactory.createVesHvCollectorProvider() - val collector: Collector get() = collectorProvider(ClientContext(alloc)) @@ -82,7 +79,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C } } -class DummySinkProvider(private val sink: Sink) : SinkProvider { +class DummySinkFactory(private val sink: Sink) : SinkFactory { private val sinkInitialized = AtomicBoolean(false) override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index fc4d8662..a34b7118 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -21,13 +21,12 @@ package org.onap.dcae.collectors.veshv.main.servers import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.factory.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono @@ -45,8 +44,7 @@ object VesServer { .doOnNext(::logServerStarted) private fun createVesServer(config: HvVesConfiguration): Server = - initializeCollectorFactory(config) - .createVesHvCollectorProvider() + createCollectorProvider(config) .let { collectorProvider -> ServerFactory.createNettyTcpServer( config.server, @@ -56,8 +54,8 @@ object VesServer { ) } - private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = - CollectorFactory( + private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory = + HvVesCollectorFactory( config.collector, AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, |