aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt14
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt27
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt1
4 files changed, 38 insertions, 10 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 66169534..7db69205 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -66,3 +66,9 @@ class Consumer : ConsumerStateProvider {
private val logger = Logger(Consumer::class)
}
}
+
+class ConsumerFactory(val kafkaBootstrapServers: String) {
+ fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
+ return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
+ }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index a8a4cf5a..fb28bc25 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -19,16 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.void
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.slf4j.LoggerFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
@@ -49,8 +49,8 @@ fun main(args: Array<String>) =
)
-private fun startApp(config: DcaeAppSimConfiguration) =
- KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
- .start()
- .map(::ApiServer)
- .flatMap { it.start(config.apiPort).void() }
+private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
+ return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+ .start(config.apiPort, config.kafkaTopics)
+ .void()
+} \ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index 2fa8abec..39b4fe2f 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -24,7 +24,9 @@ import arrow.core.getOrElse
import arrow.effects.IO
import com.google.protobuf.MessageOrBuilder
import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
@@ -35,10 +37,13 @@ import ratpack.server.ServerConfig
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val consumerState: ConsumerStateProvider) {
+class ApiServer(private val consumerFactory: ConsumerFactory) {
+
+ private lateinit var consumerState: ConsumerStateProvider
private val jsonPrinter = JsonFormat.printer()
- fun start(port: Int): IO<RatpackServer> = IO {
+ fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
+ consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
RatpackServer.start { server ->
server.serverConfig(ServerConfig.embedded().port(port))
.handlers(this::setupHandlers)
@@ -47,6 +52,17 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
private fun setupHandlers(chain: Chain) {
chain
+ .put("configuration/topics") { ctx ->
+ ctx.request.body.then { it ->
+ val topics = extractTopics(it.getText())
+ logger.info("Received new configuration. Creating consumer for topics: $topics")
+ consumerState = consumerFactory.createConsumerForTopics(topics)
+ ctx.response.contentType(CONTENT_TEXT)
+ ctx.response.send("OK")
+ }
+
+ }
+
.get("messages/count") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
val state = consumerState.currentState()
@@ -97,6 +113,11 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
}
}
+ private fun extractTopics(it: String): Set<String> =
+ it.substringAfter("=")
+ .split(",")
+ .toSet()
+
private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
parseResult.fold(
{ ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
@@ -104,6 +125,8 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
companion object {
+ private val logger = Logger(ApiServer::class)
+
private const val CONTENT_TEXT = "text/plain"
private const val CONTENT_JSON = "application/json"
}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 1661aeae..8418cd78 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -27,7 +27,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger