diff options
Diffstat (limited to 'hv-collector-core')
3 files changed, 40 insertions, 5 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 9cade1cc..535fbe12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -39,7 +40,9 @@ internal class VesHvCollector( private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = dataStream + .doOnNext(this::logIncomingMessage) .flatMap(this::decodeWire) + .doOnNext(this::logDecodedWireMessage) .flatMap(this::decodeProtobuf) .filter(this::validate) .flatMap(this::findRoute) @@ -47,6 +50,14 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() + private fun logIncomingMessage(wire: ByteBuf) { + logger.debug { "Got message with total ${wire.readableBytes()} B"} + } + + private fun logDecodedWireMessage(payload: ByteBuf) { + logger.debug { "Wire payload size: ${payload.readableBytes()} B"} + } + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -65,8 +76,6 @@ internal class VesHvCollector( msg.rawMessage.release() } - - private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { @@ -78,4 +87,8 @@ internal class VesHvCollector( Mono.just(result) } } + + companion object { + val logger = Logger(VesHvCollector::class) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8a34185f..0aacb266 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient @@ -41,6 +44,7 @@ import java.nio.ByteBuffer */ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() + fun loggingSink(): SinkProvider = LoggingSinkProvider() fun staticConfigurationProvider(config: CollectorConfiguration) = object : ConfigurationProvider { @@ -58,8 +62,25 @@ object AdapterFactory { } } + + private class LoggingSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + messages + .doOnNext { msg -> + logger.info { "Message routed to ${msg.topic}" } + } + .map { it.message } + + } + } + } + fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 415aa217..208b1ba0 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { logger.debug("Got connection") - val pipe = collectorProvider().handleConnection(nettyInbound.receive()) - val hello = nettyOutbound + val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - return hello.then(pipe) + val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + + return sendHello.then(handleIncomingMessages) } companion object { |