aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt16
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt7
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt6
6 files changed, 27 insertions, 14 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index ed9f1ad5..a372fb22 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -73,7 +73,6 @@
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
-
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 53fd7c3a..e4f02000 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<VesMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+}
+
+interface Metrics {
+ fun notifyBytesReceived(size: Int)
+ fun notifyMessageReceived(size: Int)
+ fun notifyMessageSent(topic: String)
}
@FunctionalInterface
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 73f4d09d..8785180b 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.MessageValidator
@@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference
* @since May 2018
*/
class CollectorFactory(val configuration: ConfigurationProvider,
- private val sinkProvider: SinkProvider) {
+ private val sinkProvider: SinkProvider,
+ private val metrics: Metrics) {
+
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
createVesHvCollector().subscribe(collector::set)
@@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
- VesDecoder(),
- MessageValidator(),
- Router(config.routing),
- sinkProvider(config))
+ wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+ protobufDecoder = VesDecoder(),
+ validator = MessageValidator(),
+ router = Router(config.routing),
+ sink = sinkProvider(config),
+ metrics = metrics)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 965943f6..222eaefa 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -40,18 +41,22 @@ internal class VesHvCollector(
private val protobufDecoder: VesDecoder,
private val validator: MessageValidator,
private val router: Router,
- private val sink: Sink) : Collector {
+ private val sink: Sink,
+ private val metrics: Metrics) : Collector {
override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
+ .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
.filter(WireFrame::isValid)
.map(WireFrame::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
.compose(sink::send)
+ .doOnNext { metrics.notifyMessageSent(it.topic) }
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
.then()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index b943e4e5..a5c41046 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
messages
.doOnNext(this::logMessage)
- .map { it.message }
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 6142fa3c..0a548a52 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
return sender.send(records)
.doOnNext(::logException)
@@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
.map { it.correlationMetadata() }
}
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> {
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
return SenderRecord.create(
msg.topic,
msg.partition,
System.currentTimeMillis(),
msg.message.header,
msg.message,
- msg.message)
+ msg)
}
private fun logException(senderResult: SenderResult<out Any>) {