diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator/src')
6 files changed, 119 insertions, 56 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt index 3f539302..27edde9c 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt @@ -24,12 +24,14 @@ import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DefaultValues.API_PORT import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT internal object DefaultValues { const val API_PORT = 8080 + const val KAFKA_SERVERS = "kafka:9092" + const val KAFKA_TOPICS = "ves_hvRanMeas" } class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { @@ -41,8 +43,8 @@ class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfigur override fun getConfiguration(cmdLine: CommandLine): DcaeAppSimConfiguration { val port = cmdLine.intValue(LISTEN_PORT, API_PORT) - val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS) - val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS).split(",").toSet() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS, DefaultValues.KAFKA_SERVERS) + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS, DefaultValues.KAFKA_TOPICS).split(",").toSet() return DcaeAppSimConfiguration( port, kafkaBootstrapServers, diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt index f7703b86..d53609ca 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -19,10 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions import java.util.* @@ -33,10 +33,10 @@ import java.util.* */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): Mono<Consumer> = Mono.create { sink -> + fun start(): IO<Consumer> = IO { val consumer = Consumer() receiver.receive().subscribe(consumer::update) - sink.success(consumer) + consumer } companion object { diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 5f0fe7f6..66169534 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -19,7 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka -import reactor.core.publisher.Mono +import arrow.core.Option +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord /** @@ -27,10 +29,11 @@ import reactor.kafka.receiver.ReceiverRecord * @since June 2018 */ -class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?) +class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>) interface ConsumerStateProvider { - fun currentState(): Mono<ConsumerState> + fun currentState(): ConsumerState + fun reset(): IO<Unit> } class Consumer : ConsumerStateProvider { @@ -38,18 +41,28 @@ class Consumer : ConsumerStateProvider { private var lastKey: ByteArray? = null private var lastValue: ByteArray? = null - override fun currentState(): Mono<ConsumerState> = Mono.create { sink -> - val state = synchronized(this) { - ConsumerState(msgCount, lastKey, lastValue) + override fun currentState() = + ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue)) + + override fun reset() = IO { + synchronized(this) { + msgCount = 0 + lastKey = null + lastValue = null } - sink.success(state) } fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } + synchronized(this) { msgCount++ lastKey = record.key() lastValue = record.value() } } + + companion object { + private val logger = Logger(Consumer::class) + } } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index c037af35..f7d44ede 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -19,32 +19,30 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import arrow.core.Failure +import arrow.core.Success import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) fun main(args: Array<String>) { + logger.info("Starting DCAE APP simulator") - try { - logger.info("Starting DCAE APP simulator") - val simulatorConfig = ArgBasedDcaeAppSimConfiguration().parse(args) + val config = ArgBasedDcaeAppSimConfiguration().parse(args) + when (config) { + is Success -> startApp(config.value).unsafeRunSync() + is Failure -> config.handleErrorsInMain("", logger) + } +} - KafkaSource.create(simulatorConfig.kafkaBootstrapServers, simulatorConfig.kafkaTopics) +private fun startApp(config: DcaeAppSimConfiguration) = + KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics) .start() .map(::ApiServer) - .flatMap { it.start(simulatorConfig.apiPort) } - .block() - } catch (e: WrongArgumentException) { - e.printHelp("java org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt") - System.exit(1) - } catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves dcea app simulator", e) - System.exit(2) - } -} + .flatMap { it.start(config.apiPort) } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index fcb8e131..2fa8abec 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -19,13 +19,17 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote +import arrow.core.Try +import arrow.core.getOrElse +import arrow.effects.IO +import com.google.protobuf.MessageOrBuilder import com.google.protobuf.util.JsonFormat import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -34,41 +38,71 @@ import reactor.core.publisher.Mono class ApiServer(private val consumerState: ConsumerStateProvider) { private val jsonPrinter = JsonFormat.printer() - fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable { - RatpackServer.of { server -> + fun start(port: Int): IO<RatpackServer> = IO { + RatpackServer.start { server -> server.serverConfig(ServerConfig.embedded().port(port)) .handlers(this::setupHandlers) } - }.doOnNext(RatpackServer::start) + } private fun setupHandlers(chain: Chain) { chain .get("messages/count") { ctx -> ctx.response.contentType(CONTENT_TEXT) - consumerState.currentState() - .map { it.msgCount.toString() } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + ctx.response.send(state.msgCount.toString()) } .get("messages/last/key") { ctx -> ctx.response.contentType(CONTENT_JSON) - consumerState.currentState() - .map { it.lastKey } - .map { VesEvent.CommonEventHeader.parseFrom(it) } - .map { jsonPrinter.print(it) } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + val resp = state.lastKey + .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) } .get("messages/last/value") { ctx -> ctx.response.contentType(CONTENT_JSON) - consumerState.currentState() - .map { it.lastValue } - .map { VesEvent.parseFrom(it) } - .map { jsonPrinter.print(it) } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + val resp = state.lastValue + .map { Try { VesEvent.parseFrom(it) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) + } + + .get("messages/last/hvRanMeasFields") { ctx -> + ctx.response.contentType(CONTENT_JSON) + val state = consumerState.currentState() + val resp = state.lastValue + .flatMap { Try { VesEvent.parseFrom(it) }.toOption() } + .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS } + .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) + } + + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + consumerState.reset() + .unsafeRunAsync { + it.fold( + { ctx.response.send("NOK") }, + { ctx.response.send("OK") } + ) + } } } + private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String = + parseResult.fold( + { ex -> "\"Failed to parse protobuf: ${ex.message}\"" }, + { jsonPrinter.print(it) }) + + companion object { private const val CONTENT_TEXT = "text/plain" private const val CONTENT_JSON = "application/json" diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt index 817df8e0..d99de17b 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt @@ -19,13 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import kotlin.test.assertFailsWith internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ @@ -38,7 +39,21 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ cut = ArgBasedDcaeAppSimConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parseExpectingSuccess(vararg cmdLine: String): DcaeAppSimConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } + + fun parseExpectingFailure(vararg cmdLine: String): Throwable { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> throw AssertionError("parsing should have failed") + is Failure -> result.exception + } + } describe("parsing arguments") { lateinit var result: DcaeAppSimConfiguration @@ -46,7 +61,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("all parameters are present in the long form") { beforeEachTest { - result = parse("--listen-port", "6969", + result = parseExpectingSuccess("--listen-port", "6969", "--kafka-bootstrap-servers", kafkaBootstrapServers, "--kafka-topics", kafkaTopics ) @@ -71,7 +86,9 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("some parameters are present in the short form") { beforeEachTest { - result = parse("-p", "666", "--kafka-bootstrap-servers", kafkaBootstrapServers, "-f", kafkaTopics) + result = parseExpectingSuccess("-p", "666", + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "-f", kafkaTopics) } it("should set proper port") { @@ -92,7 +109,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("all optional parameters are absent") { beforeEachTest { - result = parse("-s", kafkaBootstrapServers, "-f", kafkaTopics) + result = parseExpectingSuccess("-s", kafkaBootstrapServers, "-f", kafkaTopics) } it("should set default port") { @@ -100,21 +117,20 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ } } - describe("required parameter is absent") { given("kafka topics are missing") { it("should throw exception") { - assertFailsWith<WrongArgumentException> { parse("-s", kafkaBootstrapServers) } + assertThat(parseExpectingFailure("-s", kafkaBootstrapServers)) + .isInstanceOf(WrongArgumentException::class.java) } } given("kafka bootstrap servers are missing") { it("should throw exception") { - assertFailsWith<WrongArgumentException> { parse("-f", kafkaTopics) } + assertThat(parseExpectingFailure("-f", kafkaTopics)) + .isInstanceOf(WrongArgumentException::class.java) } } } } - - })
\ No newline at end of file |