diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-03-08 09:14:56 +0100 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-03-12 12:22:39 +0100 |
commit | 72931fcc9fdcd20a914799fcd8a0a7cb0b406ecf (patch) | |
tree | 6e534b3e253586788057cfcb1ea6aa20bddf2857 /sources/hv-collector-dcae-app-simulator/src | |
parent | 1422bed4c1a002e663fd7c2c4c204831e5f7aa9a (diff) |
Use netty HttpServer in DcaeAppApiServer
Change-Id: I895ad192babd9cc40266d0bec3830fcd4b0e054b
Issue-ID: DCAEGEN2-1325
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src')
2 files changed, 55 insertions, 52 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 88e01c23..d2c5b275 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,13 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.utils.http.HttpConstants -import org.onap.dcae.collectors.veshv.utils.http.HttpStatus -import org.onap.dcae.collectors.veshv.utils.http.Responses -import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors -import org.onap.dcae.collectors.veshv.utils.http.sendOrError +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.http.* import org.onap.dcae.collectors.veshv.utils.logging.Logger -import ratpack.handling.Chain -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig +import reactor.core.publisher.Mono +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRoutes import java.net.InetSocketAddress /** @@ -53,67 +51,73 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } - fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> = + fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> = simulator.listenToTopics(kafkaTopics).map { - RatpackServer.start { server -> - server.serverConfig( - ServerConfig.embedded() - .port(socketAddress.port) - ).handlers(::setupHandlers) - } + HttpServer.create() + .host(socketAddress.hostName) + .port(socketAddress.port) + .route(::setRoutes) + .let { NettyServerHandle(it.bindNow()) } } - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { body -> - val operation = simulator.listenToTopics(body.text) - ctx.response.sendOrError(operation) - } + private fun setRoutes(route: HttpServerRoutes) { + route + .put("/configuration/topics") { req, res -> + req + .receive().aggregate().asString() + .flatMap { + val option = simulator.listenToTopics(it) + res.sendOrError(option).then() + } } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) + .delete("/messages") { _, res -> logger.info { "Resetting simulator state" } - ctx.response.sendOrError(simulator.resetState()) + res + .header("Content-type", CONTENT_TEXT) + .sendOrError(simulator.resetState()) } - .get("messages/all/count") { ctx -> + .get("/messages/all/count") { _, res -> logger.info { "Processing request for count of received messages" } simulator.state().fold( { - ctx.response.status(HttpConstants.STATUS_NOT_FOUND) logger.warn { "Error - number of messages could not be specified" } + res.status(HttpConstants.STATUS_NOT_FOUND) }, { logger.info { "Returned number of received messages: ${it.messagesCount}" } - ctx.response - .contentType(CONTENT_TEXT) - .send(it.messagesCount.toString()) - }) + res.sendString(Mono.just(it.messagesCount.toString())) + } + ) } - .post("messages/all/validate") { ctx -> - ctx.request.body.then { body -> - logger.info { "Processing request for message validation" } - val response = simulator.validate(body.inputStream) - .map { isValid -> - if (isValid) { - logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } - responseValid - } else { - logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" } - responseInvalid - } - } - ctx.response.sendAndHandleErrors(response) - } + .post("/messages/all/validate") { req, res -> + req + .receive().aggregate().asInputStream() + .flatMap { body -> + logger.info { "Processing request for message validation" } + val response = + simulator.validate(body) + .map(::resolveValidationResponse) + res.sendAndHandleErrors(response).then() + } } - .get("healthcheck") { ctx -> + .get("/healthcheck") { _, res -> val status = HttpConstants.STATUS_OK logger.info { "Healthcheck OK, returning status: $status" } - ctx.response.status(status).send() + res.status(status).send() } } + private fun resolveValidationResponse(isValid: Boolean): Response = + if (isValid) { + logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } + responseValid + } else { + logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" } + responseInvalid + } + + companion object { private const val CONTENT_TEXT = "text/plain" private const val VALID_RESPONSE_MESSAGE = "validation passed" diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 08a94e91..4ad92712 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppAp import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -58,5 +57,5 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiAddress, config.kafkaTopics) - .unit() + .flatMap { it.await() } } |