aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-client-simulator/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-14 09:48:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 09:49:02 +0200
commit67689405071acdad2b26d5112b3662605e474ce9 (patch)
tree3e945129934d5721922fdabf229b0d61b772dfdb /hv-collector-client-simulator/src
parente7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff)
Various improvements
* Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-client-simulator/src')
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt16
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt62
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt62
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt42
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt10
5 files changed, 131 insertions, 61 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
index b8a4b888..f29b693c 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
@@ -19,16 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.config
-import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
/**
@@ -40,6 +40,8 @@ internal object DefaultValues {
const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
const val CERT_FILE = "/etc/ves-hv/client.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
+ const val VES_HV_PORT = 6061
+ const val VES_HV_HOST = "veshvcollector"
}
internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfiguration>(DefaultParser()) {
@@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfigu
)
override fun getConfiguration(cmdLine: CommandLine): ClientConfiguration {
- val host = cmdLine.stringValue(VES_HV_HOST)
- val port = cmdLine.intValue(VES_HV_PORT)
+ val host = cmdLine.stringValue(VES_HV_HOST, DefaultValues.VES_HV_HOST)
+ val port = cmdLine.intValue(VES_HV_PORT, DefaultValues.VES_HV_PORT)
val messagesAmount = cmdLine.longValue(MESSAGES_TO_SEND_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
return ClientConfiguration(
host,
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index bc7db869..3f872b51 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -19,13 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
import ratpack.handling.Chain
import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
import javax.json.Json
import javax.json.JsonObject
@@ -35,30 +39,46 @@ import javax.json.JsonObject
*/
class HttpServer(private val vesClient: VesHvClient) {
- fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
- RatpackServer.of {
- it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+ fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
}
- }.doOnNext { it.start() }
+ }
private fun configureHandlers(chain: Chain) {
- chain.post("simulator") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readObject() }
- .map { extractMessageParameters(it) }
- .map { MessageFactory.INSTANCE.createMessageFlux(it) }
- .onError { handleException(it, ctx) }
- .then {
- vesClient.send(it)
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
- }
+ chain
+ .post("simulator/sync") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendIo(it) }
+ .map { it.unsafeRunSync() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ .post("simulator/async") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendRx(it) }
+ .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ }
+
+ private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ return ctx.request.body
+ .map { Json.createReader(it.inputStream).readObject() }
+ .map { extractMessageParameters(it) }
+ .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+ }
+
+ private fun sendAcceptedResponse(ctx: Context) {
+ ctx.response
+ .status(STATUS_OK)
+ .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+ .add("response", "Request accepted")
+ .build()
+ .toString())
}
private fun handleException(t: Throwable, ctx: Context) {
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 78a72d93..be351b50 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -19,19 +19,22 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.effects.IO
import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
+import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
@@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) {
}
.build()
- fun send(messages: Flux<WireFrame>) =
- client
- .newHandler { _, out -> handler(out, messages) }
- .doOnError{logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")}
- .subscribe { logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}") }
+ fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ sendRx(messages).block()
+ }
+ fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .newHandler { _, output -> handler(complete, messages, output) }
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
- private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
+ private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ Publisher<Void> {
+ val encoder = WireFrameEncoder(nettyOutbound.alloc())
+ val context = nettyOutbound.context()
+ context.onClose {
+ logger.info { "Connection to ${context.address()} has been closed" }
+ }
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
+ // TODO: Close channel after all messages have been sent
+ // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
+// complete.subscribe {
+// context.channel().disconnect().addListener {
+// if (it.isSuccess)
+// logger.info { "Connection closed" }
+// else
+// logger.warn("Failed to close the connection", it.cause())
+// }
+// }
val frames = messages
.map(encoder::encode)
- .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
+ .window(MAX_BATCH_SIZE)
return nettyOutbound
- .options { it.flushOnEach() }
- .send(frames)
- .then { logger.info("Messages have been sent") }
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
}
private fun createSslContext(config: SecurityConfiguration): SslContext =
@@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) {
.build()
companion object {
+ private const val MAX_BATCH_SIZE = 128
private val logger = Logger(VesHvClient::class)
}
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 08bc6a71..f2229507 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,35 +19,41 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
+import arrow.core.Failure
+import arrow.core.Success
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import org.slf4j.LoggerFactory.getLogger
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
-private val logger = getLogger("Simulator :: main")
+private val logger = Logger("Simulator :: main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
fun main(args: Array<String>) {
- try {
- val clientConfig = ArgBasedClientConfiguration().parse(args)
- val vesClient = VesHvClient(clientConfig)
+ val httpServer = ArgBasedClientConfiguration().parse(args)
+ .map(::VesHvClient)
+ .map(::HttpServer)
- HttpServer(vesClient)
- .start()
- .block()
-
- } catch (e: WrongArgumentException) {
- e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
- System.exit(1)
- } catch (e: Exception) {
- logger.error(e.localizedMessage)
- logger.debug("An error occurred when starting ves client", e)
- System.exit(2)
+ when (httpServer) {
+ is Success -> httpServer.value.start().unsafeRunAsync {
+ it.fold(
+ { ex ->
+ logger.error("Failed to start a server", ex)
+ },
+ { srv ->
+ logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})")
+ }
+ )
+ }
+ is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger)
}
}
-
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
index 6420d84d..2746c0a6 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.main.config
+import arrow.core.Failure
+import arrow.core.Success
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({
cut = ArgBasedClientConfiguration()
}
- fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+ fun parse(vararg cmdLine: String): ClientConfiguration {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> result.value
+ is Failure -> throw AssertionError("Parsing result should be present")
+ }
+ }
describe("parsing arguments") {
lateinit var result: ClientConfiguration