diff options
Diffstat (limited to 'hv-collector-xnf-simulator')
2 files changed, 15 insertions, 39 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index 24ef578d..de686bc5 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,27 +20,22 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import ratpack.exec.Promise import ratpack.handling.Chain import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import javax.json.Json -import javax.json.JsonArray /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -internal class HttpServer(private val vesClient: XnfSimulator) { +internal class HttpServer(private val vesClient: XnfSimulator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser()) { fun start(port: Int): IO<RatpackServer> = IO { RatpackServer.start { server -> @@ -52,14 +47,20 @@ internal class HttpServer(private val vesClient: XnfSimulator) { private fun configureHandlers(chain: Chain) { chain .post("simulator/sync") { ctx -> - createMessageFlux(ctx) + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { MessageGenerator.INSTANCE.createMessageFlux(it) } .map { vesClient.sendIo(it) } .map { it.unsafeRunSync() } .onError { handleException(it, ctx) } .then { sendAcceptedResponse(ctx) } } .post("simulator/async") { ctx -> - createMessageFlux(ctx) + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { MessageGenerator.INSTANCE.createMessageFlux(it) } .map { vesClient.sendRx(it) } .map { it.subscribeOn(Schedulers.elastic()).subscribe() } .onError { handleException(it, ctx) } @@ -67,28 +68,6 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { - return ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { extractMessageParameters(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - } - - private fun extractMessageParameters(request: JsonArray): List<MessageParameters> = - try { - request - .map { it.asJsonObject() } - .map { - - val domain = Domain.valueOf(it.getString("domain")) - val messageType = MessageType.valueOf(it.getString("messageType")) - val messagesAmount = it.getJsonNumber("messagesAmount").longValue() - MessageParameters(domain, messageType, messagesAmount) - } - } catch (e: Exception) { - throw ValidationException("Validating request body failed", e) - } - private fun sendAcceptedResponse(ctx: Context) { ctx.response .status(STATUS_OK) @@ -117,5 +96,3 @@ internal class HttpServer(private val vesClient: XnfSimulator) { const val CONTENT_TYPE_APPLICATION_JSON = "application/json" } } - -internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause) diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 19c52efa..fa6d626b 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -38,10 +38,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map { - XnfSimulator(it) - .run(::HttpServer) - .start(it.listenPort) + .map {config -> + XnfSimulator(config) + .let { HttpServer(it) } + .start(config.listenPort) .void() } .unsafeRunEitherSync( @@ -53,4 +53,3 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) logger.info("Started xNF Simulator API server") } ) - |