summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt)32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt)6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt8
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt13
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt10
10 files changed, 40 insertions, 55 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index e3156a0d..48f335a1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -35,7 +35,7 @@ interface Sink : Closeable {
fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
-interface SinkProvider : Closeable {
+interface SinkFactory : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 0039ef62..4c54d7d2 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -30,7 +30,7 @@ interface Collector {
fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
}
-interface CollectorProvider : Closeable {
+interface CollectorFactory : Closeable {
operator fun invoke(ctx: ClientContext): Collector
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
index 04e575ae..70f61b6c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
@@ -19,13 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkFactory
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
object AdapterFactory {
- fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+ fun sinkCreatorFactory(): SinkFactory = KafkaSinkFactory()
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
index 8fb4e80d..3524f14c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
@@ -20,46 +20,36 @@
package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: CollectorConfiguration,
- private val sinkProvider: SinkProvider,
- private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int) {
+class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
+ private val sinkFactory: SinkFactory,
+ private val metrics: Metrics,
+ private val maxPayloadSizeBytes: Int): CollectorFactory {
- fun createVesHvCollectorProvider(): CollectorProvider {
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
- return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Collector =
- createVesHvCollector(ctx)
-
- override fun close() = sinkProvider.close()
- }
- }
+ override fun close() = sinkFactory.close()
private fun createVesHvCollector(ctx: ClientContext): Collector =
HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(configuration.routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkFactory, ctx, metrics),
metrics = metrics)
-
- companion object {
- private val logger = Logger(CollectorFactory::class)
- }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index 6c4e4671..e0f611b6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -37,12 +37,12 @@ object ServerFactory {
fun createNettyTcpServer(serverConfig: ServerConfiguration,
securityConfig: SecurityConfiguration,
- collectorProvider: CollectorProvider,
+ collectorFactory: CollectorFactory,
metrics: Metrics
): Server = NettyTcpServer(
serverConfig,
sslFactory.createServerContext(securityConfig),
- collectorProvider,
+ collectorFactory,
metrics
)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index b03b89e1..fe34a9c7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -23,7 +23,7 @@ import arrow.core.None
import arrow.core.toOption
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,11 +40,11 @@ class Router internal constructor(private val routing: Routing,
private val ctx: ClientContext,
private val metrics: Metrics) {
constructor(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext,
metrics: Metrics) :
this(routing,
- constructMessageSinks(routing, sinkProvider, ctx),
+ constructMessageSinks(routing, sinkFactory, ctx),
ctx,
metrics) {
logger.debug(ctx::mdc) { "Routing for client: $routing" }
@@ -87,11 +87,11 @@ class Router internal constructor(private val routing: Routing,
private val NONE_PARTITION = None
internal fun constructMessageSinks(routing: Routing,
- sinkProvider: SinkProvider,
+ sinkFactory: SinkFactory,
ctx: ClientContext) =
routing.map(Route::sink)
.distinctBy { it.topicName() }
- .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
+ .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
}
private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
index 86980832..9df1af31 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,7 +37,7 @@ import java.util.Collections.synchronizedMap
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
+internal class KafkaSinkFactory : SinkFactory {
private val messageSinks = synchronizedMap(
mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
)
@@ -58,6 +58,6 @@ internal class KafkaSinkProvider : SinkProvider {
}
companion object {
- private val logger = Logger(KafkaSinkProvider::class)
+ private val logger = Logger(KafkaSinkFactory::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index a208384a..7ce86f98 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -23,7 +23,7 @@ import arrow.core.Option
import arrow.core.getOrElse
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
@@ -51,7 +51,7 @@ import java.time.Duration
*/
internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
private val sslContext: Option<SslContext>,
- private val collectorProvider: CollectorProvider,
+ private val collectorFactory: CollectorFactory,
private val metrics: Metrics) : Server {
override fun start(): Mono<ServerHandle> =
@@ -67,7 +67,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
}
private fun closeAction(): Mono<Void> =
- collectorProvider.close().doOnSuccess {
+ collectorFactory.close().doOnSuccess {
logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
}
@@ -118,7 +118,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
- val collector = collectorProvider(clientContext)
+ val collector = collectorFactory(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 95b9159e..8b2bc13c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,17 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
@@ -51,17 +50,15 @@ import java.util.concurrent.atomic.AtomicBoolean
class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
- val sinkProvider = DummySinkProvider(sink)
+ val sinkProvider = DummySinkFactory(sink)
- private val collectorFactory = CollectorFactory(
+ private val collectorProvider = HvVesCollectorFactory(
configuration,
sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES
)
- private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
-
val collector: Collector
get() = collectorProvider(ClientContext(alloc))
@@ -82,7 +79,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
}
}
-class DummySinkProvider(private val sink: Sink) : SinkProvider {
+class DummySinkFactory(private val sink: Sink) : SinkFactory {
private val sinkInitialized = AtomicBoolean(false)
override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index fc4d8662..a34b7118 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -21,13 +21,12 @@ package org.onap.dcae.collectors.veshv.main.servers
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.factory.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
@@ -45,8 +44,7 @@ object VesServer {
.doOnNext(::logServerStarted)
private fun createVesServer(config: HvVesConfiguration): Server =
- initializeCollectorFactory(config)
- .createVesHvCollectorProvider()
+ createCollectorProvider(config)
.let { collectorProvider ->
ServerFactory.createNettyTcpServer(
config.server,
@@ -56,8 +54,8 @@ object VesServer {
)
}
- private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
- CollectorFactory(
+ private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory =
+ HvVesCollectorFactory(
config.collector,
AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,