aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt)4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt25
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt49
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt21
8 files changed, 69 insertions, 59 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ba0a9eee..0039ef62 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -36,5 +35,5 @@ interface CollectorProvider : Closeable {
}
interface Server {
- fun start(): IO<ServerHandle>
+ fun start(): Mono<ServerHandle>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 1c79abd2..8fb4e80d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,10 +25,9 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -53,7 +52,7 @@ class CollectorFactory(private val configuration: CollectorConfiguration,
}
private fun createVesHvCollector(ctx: ClientContext): Collector =
- VesHvCollector(
+ HvVesCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index 618b818f..7d8f0cb1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -44,7 +44,7 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class VesHvCollector(
+internal class HvVesCollector(
private val clientContext: ClientContext,
private val wireChunkDecoder: WireChunkDecoder,
private val protobufDecoder: VesDecoder,
@@ -116,6 +116,6 @@ internal class VesHvCollector(
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
- private val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(HvVesCollector::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index 6e2e20f7..b03b89e1 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -53,20 +53,27 @@ class Router internal constructor(private val routing: Routing,
fun route(message: VesMessage): Flux<ConsumedMessage> =
routeFor(message.header)
- .fold({
- metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
- logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
- logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
- Flux.empty<Route>()
- }, {
- logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
- Flux.just(it)
- })
+ .fold({ routeNotFound(message) }, { routeFound(message, it) })
.flatMap {
val sinkTopic = it.sink.topicName()
messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
}
+ private fun routeNotFound(message: VesMessage): Flux<Route> {
+ metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+ logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+ logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
+ return Flux.empty<Route>()
+ }
+
+ private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
+ logger.trace(ctx::fullMdc) {
+ "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
+ }
+ return Flux.just(route)
+ }
+
+
private fun routeFor(header: CommonEventHeader) =
routing.find { it.domain == header.domain }.toOption()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 2ce0f42f..7b726ab4 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -19,16 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 7a498652..86980832 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
@@ -28,6 +27,9 @@ import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
import reactor.kafka.sender.KafkaSender
import java.util.Collections.synchronizedMap
@@ -46,10 +48,14 @@ internal class KafkaSinkProvider : SinkProvider {
}
}
- override fun close() = IO {
- messageSinks.values.forEach { it.close() }
- logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
- }
+ override fun close(): Mono<Void> =
+ Flux.fromIterable(messageSinks.values)
+ .publishOn(Schedulers.elastic())
+ .doOnNext(KafkaSender<CommonEventHeader, VesMessage>::close)
+ .then()
+ .doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
+ }
companion object {
private val logger = Logger(KafkaSinkProvider::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 3e19414d..a208384a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.impl.socket
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
@@ -55,17 +54,23 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private val collectorProvider: CollectorProvider,
private val metrics: Metrics) : Server {
- override fun start(): IO<ServerHandle> = IO {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .doOnUnbound {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- collectorProvider.close().unsafeRunSync()
- }
- .let { NettyServerHandle(it.bindNow()) }
- }
+ override fun start(): Mono<ServerHandle> =
+ Mono.defer {
+ TcpServer.create()
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+ .configureSsl()
+ .handle(this::handleConnection)
+ .bind()
+ .map {
+ NettyServerHandle(it, closeAction())
+ }
+ }
+
+ private fun closeAction(): Mono<Void> =
+ collectorProvider.close().doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ }
+
private fun TcpServer.configureSsl() =
sslContext
@@ -86,7 +91,7 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
withNewClientContextFrom(nettyInbound, nettyOutbound)
{ clientContext ->
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc) { "Client connection request received" }
clientContext.clientAddress
.map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
@@ -112,20 +117,20 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
val collector = collectorProvider(clientContext)
return collector.handleClient(clientContext, nettyInbound)
}
private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
+ nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ connection
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ handleConnection(nettyInbound.createDataStream())
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index b735138d..ca9d28ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -19,17 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.wire
-import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
- .unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
when (err) {
- is InvalidWireFrame -> IO {
+ is InvalidWireFrame ->
next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
+ is MissingWireFrameBytes -> {
logEndOfData()
next.complete()
}
}
}
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
private fun logIncomingMessage(wire: ByteBuf) {