diff options
9 files changed, 173 insertions, 31 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt index 1bf9046f..628afdad 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt @@ -24,10 +24,15 @@ import org.apache.commons.cli.Options import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.CommandLine import org.apache.commons.cli.HelpFormatter +import java.io.File +import java.nio.file.Paths internal object DefaultValues { const val MESSAGES_AMOUNT = 1 + const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" + const val CERT_FILE = "/etc/ves-hv/client.crt" + const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" } /** @@ -56,29 +61,67 @@ internal object ArgBasedClientConfiguration { .desc("Amount of messages to send") .build() + private val OPT_PK_FILE = Option.builder("k") + .longOpt("private-key-file") + .hasArg() + .desc("File with client private key in PEM format") + .build() + + private val OPT_CERT_FILE = Option.builder("e") + .longOpt("cert-file") + .hasArg() + .desc("File with client certificate bundle") + .build() + + private val OPT_TRUST_CERT_FILE = Option.builder("t") + .longOpt("trust-cert-file") + .hasArg() + .desc("File with trusted certificate bundle for trusting servers") + .build() + private val options by lazy { val options = Options() options.addOption(OPT_VES_PORT) options.addOption(OPT_VES_HOST) options.addOption(OPT_MESSAGES_AMOUNT) + options.addOption(OPT_PK_FILE) + options.addOption(OPT_CERT_FILE) + options.addOption(OPT_TRUST_CERT_FILE) options } fun parse(args: Array<out String>): ClientConfiguration { + + val parser = DefaultParser() try { - parser.parse(options, args).run { - return ClientConfiguration( - stringValue(OPT_VES_HOST), - intValue(OPT_VES_PORT), - intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)) - } + val cmdLine = parser.parse(options, args) + val host = cmdLine.stringValue(OPT_VES_HOST) + val port = cmdLine.intValue(OPT_VES_PORT) + val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT) + return ClientConfiguration( + host, + port, + parseSecurityConfig(cmdLine), + msgsAmount) } catch (ex: Exception) { throw WrongArgumentException(ex) } } + private fun parseSecurityConfig(cmdLine: CommandLine): ClientSecurityConfiguration { + val pkFile = cmdLine.stringValue(OPT_PK_FILE, DefaultValues.PRIVATE_KEY_FILE) + val certFile = cmdLine.stringValue(OPT_CERT_FILE, DefaultValues.CERT_FILE) + val trustCertFile = cmdLine.stringValue(OPT_TRUST_CERT_FILE, DefaultValues.TRUST_CERT_FILE) + return ClientSecurityConfiguration( + privateKey = stringPathToPath(pkFile), + cert = stringPathToPath(certFile), + trustedCert = stringPathToPath(trustCertFile)) + } + + private fun stringPathToPath(path: String) = Paths.get(File(path).toURI()) + private fun CommandLine.intValueOrDefault(option: Option, default: Int) = getOptionValue(option.opt)?.toInt() ?: default @@ -88,12 +131,11 @@ internal object ArgBasedClientConfiguration { private fun CommandLine.stringValue(option: Option) = getOptionValue(option.opt) + private fun CommandLine.stringValue(option: Option, default: String) = + getOptionValue(option.opt) ?: default - class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { - fun printMessage() { - println(message) - } + class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) { fun printHelp(programName: String) { val formatter = HelpFormatter() formatter.printHelp(programName, options) diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt index 189dff63..e3cba57b 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt @@ -23,4 +23,8 @@ package org.onap.dcae.collectors.veshv.main.config * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int) +data class ClientConfiguration( + val vesHost: String, + val vesPort: Int, + val security: ClientSecurityConfiguration, + val messagesAmount: Int) diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt new file mode 100644 index 00000000..fc7cf665 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.config + +import java.nio.file.Path + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +data class ClientSecurityConfiguration( + val privateKey: Path, + val cert: Path, + val trustedCert: Path) diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt index 108b664f..4553ab20 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt @@ -19,9 +19,13 @@ */ package org.onap.dcae.collectors.veshv.main.impl -import io.netty.buffer.ByteBufAllocator +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import io.netty.handler.ssl.SslProvider import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration +import org.onap.dcae.collectors.veshv.main.config.ClientSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Flux @@ -37,11 +41,16 @@ import java.util.function.BiFunction */ class VesHvClient(configuration: ClientConfiguration) { - private val logger = Logger(VesHvClient::class) - private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort) + private val client: TcpClient = TcpClient.builder() + .options { opts -> + opts.host(configuration.vesHost) + .port(configuration.vesPort) + .sslContext(createSslContext(configuration.security)) + } + .build() fun send(messages: Flux<WireFrame>) { - client.start(BiFunction { i, o -> handler(i, o, messages) }) + client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) }) } // sending flux with multiple WireFrames not supported yet @@ -54,8 +63,24 @@ class VesHvClient(configuration: ClientConfiguration) { .asString(Charsets.UTF_8) .subscribe { str -> logger.info("Server response: $str") } + val frames = messages + .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } } + .map { it.encode(nettyOutbound.alloc()) } + return nettyOutbound .options { it.flushOnEach() } - .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) }) + .send(frames) + } + + private fun createSslContext(config: ClientSecurityConfiguration): SslContext = + SslContextBuilder.forClient() + .keyManager(config.cert.toFile(), config.privateKey.toFile()) + .trustManager(config.trustedCert.toFile()) + .sslProvider(SslProvider.OPENSSL) + .clientAuth(ClientAuth.REQUIRE) + .build() + + companion object { + private val logger = Logger(VesHvClient::class) } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt index cd575683..a41035da 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt @@ -31,14 +31,17 @@ private val logger = getLogger("Simulator :: main") * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -fun main(args: Array<String>) = try { - - val clientConfig = ArgBasedClientConfiguration.parse(args) - val messageFactory = MessageFactory() - val client = VesHvClient(clientConfig) - client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) -} catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves client", e) +fun main(args: Array<String>) { + try { + val clientConfig = ArgBasedClientConfiguration.parse(args) + val messageFactory = MessageFactory() + val client = VesHvClient(clientConfig) + client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) + } catch (e: ArgBasedClientConfiguration.WrongArgumentException) { + e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") + } catch (e: Exception) { + logger.error(e.localizedMessage) + logger.debug("An error occurred when starting ves client", e) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 9cade1cc..535fbe12 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -39,7 +40,9 @@ internal class VesHvCollector( private val sink: Sink) : Collector { override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = dataStream + .doOnNext(this::logIncomingMessage) .flatMap(this::decodeWire) + .doOnNext(this::logDecodedWireMessage) .flatMap(this::decodeProtobuf) .filter(this::validate) .flatMap(this::findRoute) @@ -47,6 +50,14 @@ internal class VesHvCollector( .doOnNext(this::releaseMemory) .then() + private fun logIncomingMessage(wire: ByteBuf) { + logger.debug { "Got message with total ${wire.readableBytes()} B"} + } + + private fun logDecodedWireMessage(payload: ByteBuf) { + logger.debug { "Wire payload size: ${payload.readableBytes()} B"} + } + private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) @@ -65,8 +76,6 @@ internal class VesHvCollector( msg.rawMessage.release() } - - private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { @@ -78,4 +87,8 @@ internal class VesHvCollector( Mono.just(result) } } + + companion object { + val logger = Logger(VesHvCollector::class) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8a34185f..0aacb266 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient @@ -41,6 +44,7 @@ import java.nio.ByteBuffer */ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() + fun loggingSink(): SinkProvider = LoggingSinkProvider() fun staticConfigurationProvider(config: CollectorConfiguration) = object : ConfigurationProvider { @@ -58,8 +62,25 @@ object AdapterFactory { } } + + private class LoggingSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + messages + .doOnNext { msg -> + logger.info { "Message routed to ${msg.topic}" } + } + .map { it.message } + + } + } + } + fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) + fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 415aa217..208b1ba0 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { logger.debug("Got connection") - val pipe = collectorProvider().handleConnection(nettyInbound.receive()) - val hello = nettyOutbound + val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - return hello.then(pipe) + val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + + return sendHello.then(handleIncomingMessages) } companion object { diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 906441b9..5bd63d8b 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -86,7 +86,9 @@ data class WireFrame(val payload: ByteBuf, return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) } - private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES + private const val HEADER_SIZE = + 3 * java.lang.Byte.BYTES + + 1 * java.lang.Integer.BYTES private const val FF_BYTE: Short = 0xFF private const val SUPPORTED_MAJOR_VERSION: Short = 1 } |