aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt41
1 files changed, 36 insertions, 5 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index 3f4e4fc8..fcb8e131 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
@@ -29,7 +31,9 @@ import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val messageSource: KafkaSource) {
+class ApiServer(private val consumerState: ConsumerStateProvider) {
+ private val jsonPrinter = JsonFormat.printer()
+
fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
RatpackServer.of { server ->
server.serverConfig(ServerConfig.embedded().port(port))
@@ -38,8 +42,35 @@ class ApiServer(private val messageSource: KafkaSource) {
}.doOnNext(RatpackServer::start)
private fun setupHandlers(chain: Chain) {
- chain.get("messages/count") { ctx ->
- ctx.response.send(messageSource.consumedMessages().toString())
- }
+ chain
+ .get("messages/count") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.currentState()
+ .map { it.msgCount.toString() }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/key") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastKey }
+ .map { VesEvent.CommonEventHeader.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/value") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastValue }
+ .map { VesEvent.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ private const val CONTENT_JSON = "application/json"
}
}