diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator/src')
3 files changed, 38 insertions, 9 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 66169534..7db69205 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -66,3 +66,9 @@ class Consumer : ConsumerStateProvider { private val logger = Logger(Consumer::class) } } + +class ConsumerFactory(val kafkaBootstrapServers: String) { + fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider { + return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync() + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index a8a4cf5a..fb28bc25 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -19,16 +19,16 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.void import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.slf4j.LoggerFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" private val logger = Logger(PACKAGE_NAME) @@ -49,8 +49,8 @@ fun main(args: Array<String>) = ) -private fun startApp(config: DcaeAppSimConfiguration) = - KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics) - .start() - .map(::ApiServer) - .flatMap { it.start(config.apiPort).void() } +private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { + return ApiServer(ConsumerFactory(config.kafkaBootstrapServers)) + .start(config.apiPort, config.kafkaTopics) + .void() +}
\ No newline at end of file diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index 2fa8abec..39b4fe2f 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -24,7 +24,9 @@ import arrow.core.getOrElse import arrow.effects.IO import com.google.protobuf.MessageOrBuilder import com.google.protobuf.util.JsonFormat +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain @@ -35,10 +37,13 @@ import ratpack.server.ServerConfig * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class ApiServer(private val consumerState: ConsumerStateProvider) { +class ApiServer(private val consumerFactory: ConsumerFactory) { + + private lateinit var consumerState: ConsumerStateProvider private val jsonPrinter = JsonFormat.printer() - fun start(port: Int): IO<RatpackServer> = IO { + fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO { + consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) RatpackServer.start { server -> server.serverConfig(ServerConfig.embedded().port(port)) .handlers(this::setupHandlers) @@ -47,6 +52,17 @@ class ApiServer(private val consumerState: ConsumerStateProvider) { private fun setupHandlers(chain: Chain) { chain + .put("configuration/topics") { ctx -> + ctx.request.body.then { it -> + val topics = extractTopics(it.getText()) + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState = consumerFactory.createConsumerForTopics(topics) + ctx.response.contentType(CONTENT_TEXT) + ctx.response.send("OK") + } + + } + .get("messages/count") { ctx -> ctx.response.contentType(CONTENT_TEXT) val state = consumerState.currentState() @@ -97,6 +113,11 @@ class ApiServer(private val consumerState: ConsumerStateProvider) { } } + private fun extractTopics(it: String): Set<String> = + it.substringAfter("=") + .split(",") + .toSet() + private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String = parseResult.fold( { ex -> "\"Failed to parse protobuf: ${ex.message}\"" }, @@ -104,6 +125,8 @@ class ApiServer(private val consumerState: ConsumerStateProvider) { companion object { + private val logger = Logger(ApiServer::class) + private const val CONTENT_TEXT = "text/plain" private const val CONTENT_JSON = "application/json" } |