diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-14 09:48:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 09:49:02 +0200 |
commit | 67689405071acdad2b26d5112b3662605e474ce9 (patch) | |
tree | 3e945129934d5721922fdabf229b0d61b772dfdb /hv-collector-client-simulator/src | |
parent | e7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff) |
Various improvements
* Kotlin upgrade
* Monad usage on APIs
* Idle timeout
* Simulator enhancements
Closes ONAP-390
Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-client-simulator/src')
5 files changed, 131 insertions, 61 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index b8a4b888..f29b693c 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -19,16 +19,16 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.config -import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT /** @@ -40,6 +40,8 @@ internal object DefaultValues { const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" const val CERT_FILE = "/etc/ves-hv/client.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" + const val VES_HV_PORT = 6061 + const val VES_HV_HOST = "veshvcollector" } internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfiguration>(DefaultParser()) { @@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfigu ) override fun getConfiguration(cmdLine: CommandLine): ClientConfiguration { - val host = cmdLine.stringValue(VES_HV_HOST) - val port = cmdLine.intValue(VES_HV_PORT) + val host = cmdLine.stringValue(VES_HV_HOST, DefaultValues.VES_HV_HOST) + val port = cmdLine.intValue(VES_HV_PORT, DefaultValues.VES_HV_PORT) val messagesAmount = cmdLine.longValue(MESSAGES_TO_SEND_AMOUNT, DefaultValues.MESSAGES_AMOUNT) return ClientConfiguration( host, diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index bc7db869..3f872b51 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -19,13 +19,17 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger +import ratpack.exec.Promise import ratpack.handling.Chain import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers import javax.json.Json import javax.json.JsonObject @@ -35,30 +39,46 @@ import javax.json.JsonObject */ class HttpServer(private val vesClient: VesHvClient) { - fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable { - RatpackServer.of { - it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers) + fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) } - }.doOnNext { it.start() } + } private fun configureHandlers(chain: Chain) { - chain.post("simulator") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readObject() } - .map { extractMessageParameters(it) } - .map { MessageFactory.INSTANCE.createMessageFlux(it) } - .onError { handleException(it, ctx) } - .then { - vesClient.send(it) - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - } + chain + .post("simulator/sync") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendIo(it) } + .map { it.unsafeRunSync() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + .post("simulator/async") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendRx(it) } + .map { it.subscribeOn(Schedulers.elastic()).subscribe() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + } + + private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> { + return ctx.request.body + .map { Json.createReader(it.inputStream).readObject() } + .map { extractMessageParameters(it) } + .map { MessageFactory.INSTANCE.createMessageFlux(it) } + } + + private fun sendAcceptedResponse(ctx: Context) { + ctx.response + .status(STATUS_OK) + .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() + .add("response", "Request accepted") + .build() + .toString()) } private fun handleException(t: Throwable, ctx: Context) { diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 78a72d93..be351b50 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -19,19 +19,22 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.effects.IO import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher +import reactor.core.publisher.EmitterProcessor import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient @@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) { } .build() - fun send(messages: Flux<WireFrame>) = - client - .newHandler { _, out -> handler(out, messages) } - .doOnError{logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}")} - .subscribe { logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") } + fun sendIo(messages: Flux<WireFrame>) = IO<Unit> { + sendRx(messages).block() + } + fun sendRx(messages: Flux<WireFrame>): Mono<Void> { + val complete = ReplayProcessor.create<Void>(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } - private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> { + private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound): + Publisher<Void> { + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val context = nettyOutbound.context() + context.onClose { + logger.info { "Connection to ${context.address()} has been closed" } + } - val encoder = WireFrameEncoder(nettyOutbound.alloc()) + // TODO: Close channel after all messages have been sent + // The code bellow doesn't work because it closes the channel earlier and not all are consumed... +// complete.subscribe { +// context.channel().disconnect().addListener { +// if (it.isSuccess) +// logger.info { "Connection closed" } +// else +// logger.warn("Failed to close the connection", it.cause()) +// } +// } val frames = messages .map(encoder::encode) - .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) + .window(MAX_BATCH_SIZE) return nettyOutbound - .options { it.flushOnEach() } - .send(frames) - .then { logger.info("Messages have been sent") } + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() } private fun createSslContext(config: SecurityConfiguration): SslContext = @@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) { .build() companion object { + private const val MAX_BATCH_SIZE = 128 private val logger = Logger(VesHvClient::class) } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 08bc6a71..f2229507 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,35 +19,41 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf +import arrow.core.Failure +import arrow.core.Success +import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import org.slf4j.LoggerFactory.getLogger +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain +import org.onap.dcae.collectors.veshv.utils.logging.Logger -private val logger = getLogger("Simulator :: main") +private val logger = Logger("Simulator :: main") +private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ fun main(args: Array<String>) { - try { - val clientConfig = ArgBasedClientConfiguration().parse(args) - val vesClient = VesHvClient(clientConfig) + val httpServer = ArgBasedClientConfiguration().parse(args) + .map(::VesHvClient) + .map(::HttpServer) - HttpServer(vesClient) - .start() - .block() - - } catch (e: WrongArgumentException) { - e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") - System.exit(1) - } catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves client", e) - System.exit(2) + when (httpServer) { + is Success -> httpServer.value.start().unsafeRunAsync { + it.fold( + { ex -> + logger.error("Failed to start a server", ex) + }, + { srv -> + logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})") + } + ) + } + is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger) } } - diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt index 6420d84d..2746c0a6 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.main.config +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({ cut = ArgBasedClientConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parse(vararg cmdLine: String): ClientConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } describe("parsing arguments") { lateinit var result: ClientConfiguration |