aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-dcae-app-simulator/pom.xml4
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt102
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt5
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt69
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt11
5 files changed, 125 insertions, 66 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/pom.xml b/sources/hv-collector-dcae-app-simulator/pom.xml
index b82c001a..d4ab0563 100644
--- a/sources/hv-collector-dcae-app-simulator/pom.xml
+++ b/sources/hv-collector-dcae-app-simulator/pom.xml
@@ -113,10 +113,6 @@
<artifactId>arrow-syntax</artifactId>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
- </dependency>
- <dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index 88e01c23..d2c5b275 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,15 +21,13 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
-import org.onap.dcae.collectors.veshv.utils.http.Responses
-import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
-import org.onap.dcae.collectors.veshv.utils.http.sendOrError
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.http.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRoutes
import java.net.InetSocketAddress
/**
@@ -53,67 +51,73 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
- fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> =
+ fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
simulator.listenToTopics(kafkaTopics).map {
- RatpackServer.start { server ->
- server.serverConfig(
- ServerConfig.embedded()
- .port(socketAddress.port)
- ).handlers(::setupHandlers)
- }
+ HttpServer.create()
+ .host(socketAddress.hostName)
+ .port(socketAddress.port)
+ .route(::setRoutes)
+ .let { NettyServerHandle(it.bindNow()) }
}
- private fun setupHandlers(chain: Chain) {
- chain
- .put("configuration/topics") { ctx ->
- ctx.request.body.then { body ->
- val operation = simulator.listenToTopics(body.text)
- ctx.response.sendOrError(operation)
- }
+ private fun setRoutes(route: HttpServerRoutes) {
+ route
+ .put("/configuration/topics") { req, res ->
+ req
+ .receive().aggregate().asString()
+ .flatMap {
+ val option = simulator.listenToTopics(it)
+ res.sendOrError(option).then()
+ }
}
- .delete("messages") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
+ .delete("/messages") { _, res ->
logger.info { "Resetting simulator state" }
- ctx.response.sendOrError(simulator.resetState())
+ res
+ .header("Content-type", CONTENT_TEXT)
+ .sendOrError(simulator.resetState())
}
- .get("messages/all/count") { ctx ->
+ .get("/messages/all/count") { _, res ->
logger.info { "Processing request for count of received messages" }
simulator.state().fold(
{
- ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
logger.warn { "Error - number of messages could not be specified" }
+ res.status(HttpConstants.STATUS_NOT_FOUND)
},
{
logger.info { "Returned number of received messages: ${it.messagesCount}" }
- ctx.response
- .contentType(CONTENT_TEXT)
- .send(it.messagesCount.toString())
- })
+ res.sendString(Mono.just(it.messagesCount.toString()))
+ }
+ )
}
- .post("messages/all/validate") { ctx ->
- ctx.request.body.then { body ->
- logger.info { "Processing request for message validation" }
- val response = simulator.validate(body.inputStream)
- .map { isValid ->
- if (isValid) {
- logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
- responseValid
- } else {
- logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
- responseInvalid
- }
- }
- ctx.response.sendAndHandleErrors(response)
- }
+ .post("/messages/all/validate") { req, res ->
+ req
+ .receive().aggregate().asInputStream()
+ .flatMap { body ->
+ logger.info { "Processing request for message validation" }
+ val response =
+ simulator.validate(body)
+ .map(::resolveValidationResponse)
+ res.sendAndHandleErrors(response).then()
+ }
}
- .get("healthcheck") { ctx ->
+ .get("/healthcheck") { _, res ->
val status = HttpConstants.STATUS_OK
logger.info { "Healthcheck OK, returning status: $status" }
- ctx.response.status(status).send()
+ res.status(status).send()
}
}
+ private fun resolveValidationResponse(isValid: Boolean): Response =
+ if (isValid) {
+ logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
+ responseValid
+ } else {
+ logger.info { "Comparison result: $INVALID_RESPONSE_MESSAGE" }
+ responseInvalid
+ }
+
+
companion object {
private const val CONTENT_TEXT = "text/plain"
private const val VALID_RESPONSE_MESSAGE = "validation passed"
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 08a94e91..4ad92712 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppAp
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -58,5 +57,5 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiAddress, config.kafkaTopics)
- .unit()
+ .flatMap { it.await() }
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
new file mode 100644
index 00000000..33e65e4d
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.NettyOutbound
+import reactor.netty.http.server.HttpServerResponse
+import javax.json.Json
+
+private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty")
+
+fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound =
+ sendAndHandleErrors(action.map {
+ Response(
+ HttpStatus.OK,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("response", "Request accepted").build()
+ )
+ )
+ })
+
+
+fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound =
+ response.attempt().unsafeRunSync().fold(
+ { err ->
+ logger.withWarn { log("Error occurred. Sending .", err) }
+ val message = err.message
+ sendResponse(errorResponse(message))
+ },
+ {
+ sendResponse(it)
+ }
+ )
+
+private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
+ val respWithStatus = status(response.status.number)
+ val responseContent = response.content
+
+ return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() }))
+}
+
+private fun errorResponse(message: String?): Response =
+ Response(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ Content(
+ ContentType.JSON,
+ Json.createObjectBuilder().add("error", message).build()
+ )
+ )
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
index a25b2912..529804a3 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,15 +31,6 @@ import javax.json.Json
private val logger = Logger("org.onap.dcae.collectors.veshv.utils.arrow.ratpack")
-fun ratpack.http.Response.sendOrError(action: IO<Unit>) {
- sendAndHandleErrors(action.map {
- Response(
- HttpStatus.OK,
- Content(
- ContentType.JSON,
- Json.createObjectBuilder().add("response", "Request accepted").build()))
- })
-}
fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Response>) {
when (response) {