From 67689405071acdad2b26d5112b3662605e474ce9 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 14 Jun 2018 09:48:46 +0200 Subject: Various improvements * Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../xnf/config/ArgBasedClientConfiguration.kt | 16 +++--- .../veshv/simulators/xnf/impl/HttpServer.kt | 62 ++++++++++++++-------- .../veshv/simulators/xnf/impl/VesHvClient.kt | 62 +++++++++++++++++----- .../dcae/collectors/veshv/simulators/xnf/main.kt | 42 ++++++++------- .../main/config/ArgBasedClientConfigurationTest.kt | 10 +++- 5 files changed, 131 insertions(+), 61 deletions(-) (limited to 'hv-collector-client-simulator/src') diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index b8a4b888..f29b693c 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -19,16 +19,16 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.config -import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT /** @@ -40,6 +40,8 @@ internal object DefaultValues { const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" const val CERT_FILE = "/etc/ves-hv/client.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" + const val VES_HV_PORT = 6061 + const val VES_HV_HOST = "veshvcollector" } internal class ArgBasedClientConfiguration : ArgBasedConfiguration(DefaultParser()) { @@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration = Mono.fromCallable { - RatpackServer.of { - it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers) + fun start(port: Int = DEFAULT_PORT): IO = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) } - }.doOnNext { it.start() } + } private fun configureHandlers(chain: Chain) { - chain.post("simulator") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readObject() } - .map { extractMessageParameters(it) } - .map { MessageFactory.INSTANCE.createMessageFlux(it) } - .onError { handleException(it, ctx) } - .then { - vesClient.send(it) - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - } + chain + .post("simulator/sync") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendIo(it) } + .map { it.unsafeRunSync() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + .post("simulator/async") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendRx(it) } + .map { it.subscribeOn(Schedulers.elastic()).subscribe() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + } + + private fun createMessageFlux(ctx: Context): Promise> { + return ctx.request.body + .map { Json.createReader(it.inputStream).readObject() } + .map { extractMessageParameters(it) } + .map { MessageFactory.INSTANCE.createMessageFlux(it) } + } + + private fun sendAcceptedResponse(ctx: Context) { + ctx.response + .status(STATUS_OK) + .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() + .add("response", "Request accepted") + .build() + .toString()) } private fun handleException(t: Throwable, ctx: Context) { diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 78a72d93..be351b50 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -19,19 +19,22 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.effects.IO import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher +import reactor.core.publisher.EmitterProcessor import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient @@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) { } .build() - fun send(messages: Flux) = - client - .newHandler { _, out -> handler(out, messages) } - .doOnError{logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}")} - .subscribe { logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") } + fun sendIo(messages: Flux) = IO { + sendRx(messages).block() + } + fun sendRx(messages: Flux): Mono { + val complete = ReplayProcessor.create(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } - private fun handler(nettyOutbound: NettyOutbound, messages: Flux): Publisher { + private fun handler(complete: ReplayProcessor, messages: Flux, nettyOutbound: NettyOutbound): + Publisher { + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val context = nettyOutbound.context() + context.onClose { + logger.info { "Connection to ${context.address()} has been closed" } + } - val encoder = WireFrameEncoder(nettyOutbound.alloc()) + // TODO: Close channel after all messages have been sent + // The code bellow doesn't work because it closes the channel earlier and not all are consumed... +// complete.subscribe { +// context.channel().disconnect().addListener { +// if (it.isSuccess) +// logger.info { "Connection closed" } +// else +// logger.warn("Failed to close the connection", it.cause()) +// } +// } val frames = messages .map(encoder::encode) - .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) + .window(MAX_BATCH_SIZE) return nettyOutbound - .options { it.flushOnEach() } - .send(frames) - .then { logger.info("Messages have been sent") } + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() } private fun createSslContext(config: SecurityConfiguration): SslContext = @@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) { .build() companion object { + private const val MAX_BATCH_SIZE = 128 private val logger = Logger(VesHvClient::class) } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 08bc6a71..f2229507 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,35 +19,41 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf +import arrow.core.Failure +import arrow.core.Success +import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import org.slf4j.LoggerFactory.getLogger +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain +import org.onap.dcae.collectors.veshv.utils.logging.Logger -private val logger = getLogger("Simulator :: main") +private val logger = Logger("Simulator :: main") +private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" /** * @author Jakub Dudycz * @since June 2018 */ fun main(args: Array) { - try { - val clientConfig = ArgBasedClientConfiguration().parse(args) - val vesClient = VesHvClient(clientConfig) + val httpServer = ArgBasedClientConfiguration().parse(args) + .map(::VesHvClient) + .map(::HttpServer) - HttpServer(vesClient) - .start() - .block() - - } catch (e: WrongArgumentException) { - e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") - System.exit(1) - } catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves client", e) - System.exit(2) + when (httpServer) { + is Success -> httpServer.value.start().unsafeRunAsync { + it.fold( + { ex -> + logger.error("Failed to start a server", ex) + }, + { srv -> + logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})") + } + ) + } + is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger) } } - diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt index 6420d84d..2746c0a6 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.main.config +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({ cut = ArgBasedClientConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parse(vararg cmdLine: String): ClientConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } describe("parsing arguments") { lateinit var result: ClientConfiguration -- cgit 1.2.3-korg