From c7a3e0738abf581640059587dbb81790339340c9 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 3 Apr 2019 12:12:17 +0200 Subject: Simplify factory/provider logic Change-Id: I59467c41e1de63ead7c190a7c8fd688e3216237a Issue-ID: DCAEGEN2-1385 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/boundary/adapters.kt | 2 +- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 2 +- .../collectors/veshv/factory/AdapterFactory.kt | 6 +- .../collectors/veshv/factory/CollectorFactory.kt | 65 ---------------------- .../veshv/factory/HvVesCollectorFactory.kt | 55 ++++++++++++++++++ .../dcae/collectors/veshv/factory/ServerFactory.kt | 6 +- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 10 ++-- .../veshv/impl/adapters/kafka/KafkaSinkFactory.kt | 63 +++++++++++++++++++++ .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 63 --------------------- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 8 +-- 10 files changed, 135 insertions(+), 145 deletions(-) delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt (limited to 'sources/hv-collector-core/src') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index e3156a0d..48f335a1 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -35,7 +35,7 @@ interface Sink : Closeable { fun send(messages: Flux): Flux } -interface SinkProvider : Closeable { +interface SinkFactory : Closeable { operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 0039ef62..4c54d7d2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -30,7 +30,7 @@ interface Collector { fun handleConnection(dataStream: Flux): Mono } -interface CollectorProvider : Closeable { +interface CollectorFactory : Closeable { operator fun invoke(ctx: ClientContext): Collector } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt index 04e575ae..70f61b6c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt @@ -19,13 +19,13 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory /** * @author Piotr Jaszczyk * @since May 2018 */ object AdapterFactory { - fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider() + fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt deleted file mode 100644 index 8fb4e80d..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.impl.Router -import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.HvVesCollector -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -class CollectorFactory(private val configuration: CollectorConfiguration, - private val sinkProvider: SinkProvider, - private val metrics: Metrics, - private val maxPayloadSizeBytes: Int) { - - fun createVesHvCollectorProvider(): CollectorProvider { - - return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Collector = - createVesHvCollector(ctx) - - override fun close() = sinkProvider.close() - } - } - - private fun createVesHvCollector(ctx: ClientContext): Collector = - HvVesCollector( - clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), - protobufDecoder = VesDecoder(), - router = Router(configuration.routing, sinkProvider, ctx, metrics), - metrics = metrics) - - companion object { - private val logger = Logger(CollectorFactory::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt new file mode 100644 index 00000000..3524f14c --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.factory + +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.boundary.SinkFactory +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.impl.HvVesCollector +import org.onap.dcae.collectors.veshv.impl.Router +import org.onap.dcae.collectors.veshv.impl.VesDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.ClientContext + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +class HvVesCollectorFactory(private val configuration: CollectorConfiguration, + private val sinkFactory: SinkFactory, + private val metrics: Metrics, + private val maxPayloadSizeBytes: Int): CollectorFactory { + + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) + + override fun close() = sinkFactory.close() + + private fun createVesHvCollector(ctx: ClientContext): Collector = + HvVesCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(configuration.routing, sinkFactory, ctx, metrics), + metrics = metrics) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt index 6c4e4671..e0f611b6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.factory -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -37,12 +37,12 @@ object ServerFactory { fun createNettyTcpServer(serverConfig: ServerConfiguration, securityConfig: SecurityConfiguration, - collectorProvider: CollectorProvider, + collectorFactory: CollectorFactory, metrics: Metrics ): Server = NettyTcpServer( serverConfig, sslFactory.createServerContext(securityConfig), - collectorProvider, + collectorFactory, metrics ) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index b03b89e1..fe34a9c7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -23,7 +23,7 @@ import arrow.core.None import arrow.core.toOption import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing, private val ctx: ClientContext, private val metrics: Metrics) { constructor(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext, metrics: Metrics) : this(routing, - constructMessageSinks(routing, sinkProvider, ctx), + constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) { logger.debug(ctx::mdc) { "Routing for client: $routing" } @@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing, private val NONE_PARTITION = None internal fun constructMessageSinks(routing: Routing, - sinkProvider: SinkProvider, + sinkFactory: SinkFactory, ctx: ClientContext) = routing.map(Route::sink) .distinctBy { it.topicName() } - .associateBy({ it.topicName() }, { sinkProvider(it, ctx) }) + .associateBy({ it.topicName() }, { sinkFactory(it, ctx) }) } private fun Lazy.send(message: RoutedMessage) = value.send(message) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt new file mode 100644 index 00000000..9df1af31 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.onap.dcae.collectors.veshv.boundary.SinkFactory +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.impl.createKafkaSender +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.kafka.sender.KafkaSender +import java.util.Collections.synchronizedMap + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +internal class KafkaSinkFactory : SinkFactory { + private val messageSinks = synchronizedMap( + mutableMapOf>() + ) + + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { + KafkaPublisher(it, ctx) + } + } + + override fun close(): Mono = + Flux.fromIterable(messageSinks.values) + .publishOn(Schedulers.elastic()) + .doOnNext(KafkaSender::close) + .then() + .doOnSuccess { + logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } + } + + companion object { + private val logger = Logger(KafkaSinkFactory::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt deleted file mode 100644 index 86980832..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.impl.createKafkaSender -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.model.streams.SinkStream -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.scheduler.Schedulers -import reactor.kafka.sender.KafkaSender -import java.util.Collections.synchronizedMap - -/** - * @author Piotr Jaszczyk - * @since June 2018 - */ -internal class KafkaSinkProvider : SinkProvider { - private val messageSinks = synchronizedMap( - mutableMapOf>() - ) - - override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { - messageSinks.computeIfAbsent(stream, ::createKafkaSender).let { - KafkaPublisher(it, ctx) - } - } - - override fun close(): Mono = - Flux.fromIterable(messageSinks.values) - .publishOn(Schedulers.elastic()) - .doOnNext(KafkaSender::close) - .then() - .doOnSuccess { - logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" } - } - - companion object { - private val logger = Logger(KafkaSinkProvider::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a208384a..7ce86f98 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -23,7 +23,7 @@ import arrow.core.Option import arrow.core.getOrElse import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration @@ -51,7 +51,7 @@ import java.time.Duration */ internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, private val sslContext: Option, - private val collectorProvider: CollectorProvider, + private val collectorFactory: CollectorFactory, private val metrics: Metrics) : Server { override fun start(): Mono = @@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati } private fun closeAction(): Mono = - collectorProvider.close().doOnSuccess { + collectorFactory.close().doOnSuccess { logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } } @@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { metrics.notifyClientConnected() logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } - val collector = collectorProvider(clientContext) + val collector = collectorFactory(clientContext) return collector.handleClient(clientContext, nettyInbound) } -- cgit 1.2.3-korg