From 162506c71c90bb0b24005f81ac0673820cf57421 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 7 Jun 2018 08:46:51 +0200 Subject: Add SSL/TLS to client simulator Change-Id: Iedebd222be08931b95e52a84c8c4d9c0df9e1da1 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 17 +++++++++++++++-- .../veshv/impl/adapters/AdapterFactory.kt | 21 +++++++++++++++++++++ .../collectors/veshv/impl/socket/NettyTcpServer.kt | 7 ++++--- 3 files changed, 40 insertions(+), 5 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 9cade1cc..535fbe12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -39,7 +40,9 @@ internal class VesHvCollector( private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux): Mono = dataStream + .doOnNext(this::logIncomingMessage) .flatMap(this::decodeWire) + .doOnNext(this::logDecodedWireMessage) .flatMap(this::decodeProtobuf) .filter(this::validate) .flatMap(this::findRoute) @@ -47,6 +50,14 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() + private fun logIncomingMessage(wire: ByteBuf) { + logger.debug { "Got message with total ${wire.readableBytes()} B"} + } + + private fun logDecodedWireMessage(payload: ByteBuf) { + logger.debug { "Wire payload size: ${payload.readableBytes()} B"} + } + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -65,8 +76,6 @@ internal class VesHvCollector( msg.rawMessage.release() } - - private fun omitWhenNull(input: T, mapper: (T) -> V?): Mono = Mono.justOrEmpty(mapper(input)) private fun releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono { @@ -78,4 +87,8 @@ internal class VesHvCollector( Mono.just(result) } } + + companion object { + val logger = Logger(VesHvCollector::class) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8a34185f..0aacb266 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient @@ -41,6 +44,7 @@ import java.nio.ByteBuffer */ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() + fun loggingSink(): SinkProvider = LoggingSinkProvider() fun staticConfigurationProvider(config: CollectorConfiguration) = object : ConfigurationProvider { @@ -58,8 +62,25 @@ object AdapterFactory { } } + + private class LoggingSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + override fun send(messages: Flux): Flux = + messages + .doOnNext { msg -> + logger.info { "Message routed to ${msg.topic}" } + } + .map { it.message } + + } + } + } + fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 415aa217..208b1ba0 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { logger.debug("Got connection") - val pipe = collectorProvider().handleConnection(nettyInbound.receive()) - val hello = nettyOutbound + val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - return hello.then(pipe) + val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + + return sendHello.then(handleIncomingMessages) } companion object { -- cgit 1.2.3-korg