aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt)32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt)6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt8
8 files changed, 31 insertions, 41 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index e3156a0d..48f335a1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -35,7 +35,7 @@ interface Sink : Closeable {
fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
-interface SinkProvider : Closeable {
+interface SinkFactory : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 0039ef62..4c54d7d2 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -30,7 +30,7 @@ interface Collector {
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-interface CollectorProvider : Closeable {
+interface CollectorFactory : Closeable {
operator fun invoke(ctx: ClientContext): Collector
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
index 04e575ae..70f61b6c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
@@ -19,13 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+ fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory()
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
index 8fb4e80d..3524f14c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
@@ -20,46 +20,36 @@
package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: CollectorConfiguration,
- private val sinkProvider: SinkProvider,
- private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int) {
+class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
+ private val sinkFactory: SinkFactory,
+ private val metrics: Metrics,
+ private val maxPayloadSizeBytes: Int): CollectorFactory {
- fun createVesHvCollectorProvider(): CollectorProvider {
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
- return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Collector =
- createVesHvCollector(ctx)
-
- override fun close() = sinkProvider.close()
- }
- }
+ override fun close() = sinkFactory.close()
private fun createVesHvCollector(ctx: ClientContext): Collector =
HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(configuration.routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkFactory, ctx, metrics),
metrics = metrics)
-
- companion object {
- private val logger = Logger(CollectorFactory::class)
- }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index 6c4e4671..e0f611b6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -37,12 +37,12 @@ object ServerFactory {
fun createNettyTcpServer(serverConfig: ServerConfiguration,
securityConfig: SecurityConfiguration,
- collectorProvider: CollectorProvider,
+ collectorFactory: CollectorFactory,
metrics: Metrics
): Server = NettyTcpServer(
serverConfig,
sslFactory.createServerContext(securityConfig),
- collectorProvider,
+ collectorFactory,
metrics
)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index b03b89e1..fe34a9c7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -23,7 +23,7 @@ import arrow.core.None
import arrow.core.toOption
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing,
private val ctx: ClientContext,
private val metrics: Metrics) {
constructor(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext,
metrics: Metrics) :
this(routing,
- constructMessageSinks(routing, sinkProvider, ctx),
+ constructMessageSinks(routing, sinkFactory, ctx),
ctx,
metrics) {
logger.debug(ctx::mdc) { "Routing for client: $routing" }
@@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing,
private val NONE_PARTITION = None
internal fun constructMessageSinks(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext) =
routing.map(Route::sink)
.distinctBy { it.topicName() }
- .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
+ .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
}
private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
index 86980832..9df1af31 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,7 +37,7 @@ import java.util.Collections.synchronizedMap
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
+internal class KafkaSinkFactory : SinkFactory {
private val messageSinks = synchronizedMap(
mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
)
@@ -58,6 +58,6 @@ internal class KafkaSinkProvider : SinkProvider {
}
companion object {
- private val logger = Logger(KafkaSinkProvider::class)
+ private val logger = Logger(KafkaSinkFactory::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index a208384a..7ce86f98 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -23,7 +23,7 @@ import arrow.core.Option
import arrow.core.getOrElse
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -51,7 +51,7 @@ import java.time.Duration
*/
internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
private val sslContext: Option<SslContext>,
- private val collectorProvider: CollectorProvider,
+ private val collectorFactory: CollectorFactory,
private val metrics: Metrics) : Server {
override fun start(): Mono<ServerHandle> =
@@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
}
private fun closeAction(): Mono<Void> =
- collectorProvider.close().doOnSuccess {
+ collectorFactory.close().doOnSuccess {
logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
}
@@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
- val collector = collectorProvider(clientContext)
+ val collector = collectorFactory(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}