diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-07 08:46:51 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 12:34:27 +0200 |
commit | 162506c71c90bb0b24005f81ac0673820cf57421 (patch) | |
tree | ced018d2917b57a5ed0dd68d94c0c19c94a9c5e5 /hv-collector-core | |
parent | 85439499beabf26902ebd670e6855bdaa18f470b (diff) |
Add SSL/TLS to client simulator
Change-Id: Iedebd222be08931b95e52a84c8c4d9c0df9e1da1
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
3 files changed, 40 insertions, 5 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 9cade1cc..535fbe12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -39,7 +40,9 @@ internal class VesHvCollector( private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = dataStream + .doOnNext(this::logIncomingMessage) .flatMap(this::decodeWire) + .doOnNext(this::logDecodedWireMessage) .flatMap(this::decodeProtobuf) .filter(this::validate) .flatMap(this::findRoute) @@ -47,6 +50,14 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() + private fun logIncomingMessage(wire: ByteBuf) { + logger.debug { "Got message with total ${wire.readableBytes()} B"} + } + + private fun logDecodedWireMessage(payload: ByteBuf) { + logger.debug { "Wire payload size: ${payload.readableBytes()} B"} + } + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -65,8 +76,6 @@ internal class VesHvCollector( msg.rawMessage.release() } - - private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { @@ -78,4 +87,8 @@ internal class VesHvCollector( Mono.just(result) } } + + companion object { + val logger = Logger(VesHvCollector::class) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8a34185f..0aacb266 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient @@ -41,6 +44,7 @@ import java.nio.ByteBuffer */ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() + fun loggingSink(): SinkProvider = LoggingSinkProvider() fun staticConfigurationProvider(config: CollectorConfiguration) = object : ConfigurationProvider { @@ -58,8 +62,25 @@ object AdapterFactory { } } + + private class LoggingSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + messages + .doOnNext { msg -> + logger.info { "Message routed to ${msg.topic}" } + } + .map { it.message } + + } + } + } + fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 415aa217..208b1ba0 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { logger.debug("Got connection") - val pipe = collectorProvider().handleConnection(nettyInbound.receive()) - val hello = nettyOutbound + val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - return hello.then(pipe) + val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + + return sendHello.then(handleIncomingMessages) } companion object { |