aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator
diff options
context:
space:
mode:
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-03-08 09:14:56 +0100
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>2019-03-12 12:22:39 +0100
commit72931fcc9fdcd20a914799fcd8a0a7cb0b406ecf (patch)
tree6e534b3e253586788057cfcb1ea6aa20bddf2857 /sources/hv-collector-dcae-app-simulator
parent1422bed4c1a002e663fd7c2c4c204831e5f7aa9a (diff)
Use netty HttpServer in DcaeAppApiServer
Change-Id: I895ad192babd9cc40266d0bec3830fcd4b0e054b Issue-ID: DCAEGEN2-1325 Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator')
-rw-r--r--sources/hv-collector-dcae-app-simulator/pom.xml4
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt102
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt5
3 files changed, 55 insertions, 56 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml
index b82c001a..d4ab0563 100644
--- a/sources/hv-collector-dcae-app-simulator/pom.xml
+++ b/sources/hv-collector-dcae-app-simulator/pom.xml
@@ -113,10 +113,6 @@
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
- </dependency>
- <dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index 88e01c23..d2c5b275 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,15 +21,13 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
-import org.onap.dcae.collectors.veshv.utils.http.Responses
-import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
-import org.onap.dcae.collectors.veshv.utils.http.sendOrError
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.http.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRoutes
import java.net.InetSocketAddress
/**
@@ -53,67 +51,73 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
- fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> =
+ fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
simulator.listenToTopics(kafkaTopics).map {
- RatpackServer.start { server ->
- server.serverConfig(
- ServerConfig.embedded()
- .port(socketAddress.port)
- ).handlers(::setupHandlers)
- }
+ HttpServer.create()
+ .host(socketAddress.hostName)
+ .port(socketAddress.port)
+ .route(::setRoutes)
+ .let { NettyServerHandle(it.bindNow()) }
}
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { body ->
- val operation = simulator.listenToTopics(body.text)
- ctx.response.sendOrError(operation)
- }
+ private fun setRoutes(route: HttpServerRoutes) {
+ route
+ .put("/configuration/topics") { req, res ->
+ req
+ .receive().aggregate().asString()
+ .flatMap {
+ val option = simulator.listenToTopics(it)
+ res.sendOrError(option).then()
+ }
}
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
+ .delete("/messages") { _, res ->
logger.info { "Resetting simulator state" }
- ctx.response.sendOrError(simulator.resetState())
+ res
+ .header("Content-type", CONTENT_TEXT)
+ .sendOrError(simulator.resetState())
}
- .get("messages/all/count") { ctx ->
+ .get("/messages/all/count") { _, res ->
logger.info { "Processing request for count of received messages" }
simulator.state().fold(
{
- ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
logger.warn { "Error - number of messages could not be specified" }
+ res.status(HttpConstants.STATUS_NOT_FOUND)
},
{
logger.info { "Returned number of received messages: ${it.messagesCount}" }
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(it.messagesCount.toString())
- })
+ res.sendString(Mono.just(it.messagesCount.toString()))
+ }
+ )
}
- .post("messages/all/validate") { ctx ->
- ctx.request.body.then { body ->
- logger.info { "Processing request for message validation" }
- val response = simulator.validate(body.inputStream)
- .map { isValid ->
- if (isValid) {
- logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
- responseValid
- } else {
- logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
- responseInvalid
- }
- }
- ctx.response.sendAndHandleErrors(response)
- }
+ .post("/messages/all/validate") { req, res ->
+ req
+ .receive().aggregate().asInputStream()
+ .flatMap { body ->
+ logger.info { "Processing request for message validation" }
+ val response =
+ simulator.validate(body)
+ .map(::resolveValidationResponse)
+ res.sendAndHandleErrors(response).then()
+ }
}
- .get("healthcheck") { ctx ->
+ .get("/healthcheck") { _, res ->
val status = HttpConstants.STATUS_OK
logger.info { "Healthcheck OK, returning status: $status" }
- ctx.response.status(status).send()
+ res.status(status).send()
}
}
+ private fun resolveValidationResponse(isValid: Boolean): Response =
+ if (isValid) {
+ logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
+ responseValid
+ } else {
+ logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
+ responseInvalid
+ }
+
+
companion object {
private const val CONTENT_TEXT = "text/plain"
private const val VALID_RESPONSE_MESSAGE = "validation passed"
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 08a94e91..4ad92712 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppAp
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -58,5 +57,5 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiAddress, config.kafkaTopics)
- .unit()
+ .flatMap { it.await() }
}