diff options
5 files changed, 125 insertions, 66 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml index b82c001a..d4ab0563 100644 --- a/sources/hv-collector-dcae-app-simulator/pom.xml +++ b/sources/hv-collector-dcae-app-simulator/pom.xml @@ -113,10 +113,6 @@ <artifactId>arrow-syntax</artifactId> </dependency> <dependency> - <groupId>io.ratpack</groupId> - <artifactId>ratpack-core</artifactId> - </dependency> - <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 88e01c23..d2c5b275 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,13 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator -import org.onap.dcae.collectors.veshv.utils.http.HttpConstants -import org.onap.dcae.collectors.veshv.utils.http.HttpStatus -import org.onap.dcae.collectors.veshv.utils.http.Responses -import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors -import org.onap.dcae.collectors.veshv.utils.http.sendOrError +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.http.* import org.onap.dcae.collectors.veshv.utils.logging.Logger -import ratpack.handling.Chain -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig +import reactor.core.publisher.Mono +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRoutes import java.net.InetSocketAddress /** @@ -53,67 +51,73 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } - fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> = + fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> = simulator.listenToTopics(kafkaTopics).map { - RatpackServer.start { server -> - server.serverConfig( - ServerConfig.embedded() - .port(socketAddress.port) - ).handlers(::setupHandlers) - } + HttpServer.create() + .host(socketAddress.hostName) + .port(socketAddress.port) + .route(::setRoutes) + .let { NettyServerHandle(it.bindNow()) } } - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { body -> - val operation = simulator.listenToTopics(body.text) - ctx.response.sendOrError(operation) - } + private fun setRoutes(route: HttpServerRoutes) { + route + .put("/configuration/topics") { req, res -> + req + .receive().aggregate().asString() + .flatMap { + val option = simulator.listenToTopics(it) + res.sendOrError(option).then() + } } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) + .delete("/messages") { _, res -> logger.info { "Resetting simulator state" } - ctx.response.sendOrError(simulator.resetState()) + res + .header("Content-type", CONTENT_TEXT) + .sendOrError(simulator.resetState()) } - .get("messages/all/count") { ctx -> + .get("/messages/all/count") { _, res -> logger.info { "Processing request for count of received messages" } simulator.state().fold( { - ctx.response.status(HttpConstants.STATUS_NOT_FOUND) logger.warn { "Error - number of messages could not be specified" } + res.status(HttpConstants.STATUS_NOT_FOUND) }, { logger.info { "Returned number of received messages: ${it.messagesCount}" } - ctx.response - .contentType(CONTENT_TEXT) - .send(it.messagesCount.toString()) - }) + res.sendString(Mono.just(it.messagesCount.toString())) + } + ) } - .post("messages/all/validate") { ctx -> - ctx.request.body.then { body -> - logger.info { "Processing request for message validation" } - val response = simulator.validate(body.inputStream) - .map { isValid -> - if (isValid) { - logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } - responseValid - } else { - logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" } - responseInvalid - } - } - ctx.response.sendAndHandleErrors(response) - } + .post("/messages/all/validate") { req, res -> + req + .receive().aggregate().asInputStream() + .flatMap { body -> + logger.info { "Processing request for message validation" } + val response = + simulator.validate(body) + .map(::resolveValidationResponse) + res.sendAndHandleErrors(response).then() + } } - .get("healthcheck") { ctx -> + .get("/healthcheck") { _, res -> val status = HttpConstants.STATUS_OK logger.info { "Healthcheck OK, returning status: $status" } - ctx.response.status(status).send() + res.status(status).send() } } + private fun resolveValidationResponse(isValid: Boolean): Response = + if (isValid) { + logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } + responseValid + } else { + logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" } + responseInvalid + } + + companion object { private const val CONTENT_TEXT = "text/plain" private const val VALID_RESPONSE_MESSAGE = "validation passed" diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 08a94e91..4ad92712 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppAp import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -58,5 +57,5 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiAddress, config.kafkaTopics) - .unit() + .flatMap { it.await() } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt new file mode 100644 index 00000000..33e65e4d --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt @@ -0,0 +1,69 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.http + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.netty.NettyOutbound +import reactor.netty.http.server.HttpServerResponse +import javax.json.Json + +private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty") + +fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound = + sendAndHandleErrors(action.map { + Response( + HttpStatus.OK, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("response", "Request accepted").build() + ) + ) + }) + + +fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound = + response.attempt().unsafeRunSync().fold( + { err -> + logger.withWarn { log("Error occurred. Sending .", err) } + val message = err.message + sendResponse(errorResponse(message)) + }, + { + sendResponse(it) + } + ) + +private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound { + val respWithStatus = status(response.status.number) + val responseContent = response.content + + return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() })) +} + +private fun errorResponse(message: String?): Response = + Response( + HttpStatus.INTERNAL_SERVER_ERROR, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("error", message).build() + ) + ) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt index a25b2912..529804a3 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,15 +31,6 @@ import javax.json.Json private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack") -fun ratpack.http.Response.sendOrError(action: IO<Unit>) { - sendAndHandleErrors(action.map { - Response( - HttpStatus.OK, - Content( - ContentType.JSON, - Json.createObjectBuilder().add("response", "Request accepted").build())) - }) -} fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) { when (response) { |