aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt8
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt27
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt30
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt68
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt36
6 files changed, 119 insertions, 56 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
index 3f539302..27edde9c 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
@@ -24,12 +24,14 @@ import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DefaultValues.API_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
internal object DefaultValues {
const val API_PORT = 8080
+ const val KAFKA_SERVERS = "kafka:9092"
+ const val KAFKA_TOPICS = "ves_hvRanMeas"
}
class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
@@ -41,8 +43,8 @@ class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfigur
override fun getConfiguration(cmdLine: CommandLine): DcaeAppSimConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, API_PORT)
- val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS)
- val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS).split(",").toSet()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS, DefaultValues.KAFKA_SERVERS)
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS, DefaultValues.KAFKA_TOPICS).split(",").toSet()
return DcaeAppSimConfiguration(
port,
kafkaBootstrapServers,
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
index f7703b86..d53609ca 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
@@ -19,10 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import java.util.*
@@ -33,10 +33,10 @@ import java.util.*
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- fun start(): Mono<Consumer> = Mono.create { sink ->
+ fun start(): IO<Consumer> = IO {
val consumer = Consumer()
receiver.receive().subscribe(consumer::update)
- sink.success(consumer)
+ consumer
}
companion object {
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 5f0fe7f6..66169534 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
-import reactor.core.publisher.Mono
+import arrow.core.Option
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
/**
@@ -27,10 +29,11 @@ import reactor.kafka.receiver.ReceiverRecord
* @since June 2018
*/
-class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
interface ConsumerStateProvider {
- fun currentState(): Mono<ConsumerState>
+ fun currentState(): ConsumerState
+ fun reset(): IO<Unit>
}
class Consumer : ConsumerStateProvider {
@@ -38,18 +41,28 @@ class Consumer : ConsumerStateProvider {
private var lastKey: ByteArray? = null
private var lastValue: ByteArray? = null
- override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
- val state = synchronized(this) {
- ConsumerState(msgCount, lastKey, lastValue)
+ override fun currentState() =
+ ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+
+ override fun reset() = IO {
+ synchronized(this) {
+ msgCount = 0
+ lastKey = null
+ lastValue = null
}
- sink.success(state)
}
fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
+
synchronized(this) {
msgCount++
lastKey = record.key()
lastValue = record.value()
}
}
+
+ companion object {
+ private val logger = Logger(Consumer::class)
+ }
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index c037af35..f7d44ede 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -19,32 +19,30 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
+import arrow.core.Failure
+import arrow.core.Success
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.slf4j.LoggerFactory
private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
fun main(args: Array<String>) {
+ logger.info("Starting DCAE APP simulator")
- try {
- logger.info("Starting DCAE APP simulator")
- val simulatorConfig = ArgBasedDcaeAppSimConfiguration().parse(args)
+ val config = ArgBasedDcaeAppSimConfiguration().parse(args)
+ when (config) {
+ is Success -> startApp(config.value).unsafeRunSync()
+ is Failure -> config.handleErrorsInMain("", logger)
+ }
+}
- KafkaSource.create(simulatorConfig.kafkaBootstrapServers, simulatorConfig.kafkaTopics)
+private fun startApp(config: DcaeAppSimConfiguration) =
+ KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
.start()
.map(::ApiServer)
- .flatMap { it.start(simulatorConfig.apiPort) }
- .block()
- } catch (e: WrongArgumentException) {
- e.printHelp("java org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt")
- System.exit(1)
- } catch (e: Exception) {
- logger.error(e.localizedMessage)
- logger.debug("An error occurred when starting ves dcea app simulator", e)
- System.exit(2)
- }
-}
+ .flatMap { it.start(config.apiPort) }
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index fcb8e131..2fa8abec 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,13 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
+import arrow.core.Try
+import arrow.core.getOrElse
+import arrow.effects.IO
+import com.google.protobuf.MessageOrBuilder
import com.google.protobuf.util.JsonFormat
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -34,41 +38,71 @@ import reactor.core.publisher.Mono
class ApiServer(private val consumerState: ConsumerStateProvider) {
private val jsonPrinter = JsonFormat.printer()
- fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
- RatpackServer.of { server ->
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
server.serverConfig(ServerConfig.embedded().port(port))
.handlers(this::setupHandlers)
}
- }.doOnNext(RatpackServer::start)
+ }
private fun setupHandlers(chain: Chain) {
chain
.get("messages/count") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
- consumerState.currentState()
- .map { it.msgCount.toString() }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ ctx.response.send(state.msgCount.toString())
}
.get("messages/last/key") { ctx ->
ctx.response.contentType(CONTENT_JSON)
- consumerState.currentState()
- .map { it.lastKey }
- .map { VesEvent.CommonEventHeader.parseFrom(it) }
- .map { jsonPrinter.print(it) }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ val resp = state.lastKey
+ .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
}
.get("messages/last/value") { ctx ->
ctx.response.contentType(CONTENT_JSON)
- consumerState.currentState()
- .map { it.lastValue }
- .map { VesEvent.parseFrom(it) }
- .map { jsonPrinter.print(it) }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ val resp = state.lastValue
+ .map { Try { VesEvent.parseFrom(it) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
+ }
+
+ .get("messages/last/hvRanMeasFields") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ val state = consumerState.currentState()
+ val resp = state.lastValue
+ .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
+ .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
+ .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
+ }
+
+ .delete("messages") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.reset()
+ .unsafeRunAsync {
+ it.fold(
+ { ctx.response.send("NOK") },
+ { ctx.response.send("OK") }
+ )
+ }
}
}
+ private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
+ parseResult.fold(
+ { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
+ { jsonPrinter.print(it) })
+
+
companion object {
private const val CONTENT_TEXT = "text/plain"
private const val CONTENT_JSON = "application/json"
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
index 817df8e0..d99de17b 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
@@ -19,13 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+import arrow.core.Failure
+import arrow.core.Success
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import kotlin.test.assertFailsWith
internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
@@ -38,7 +39,21 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
cut = ArgBasedDcaeAppSimConfiguration()
}
- fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+ fun parseExpectingSuccess(vararg cmdLine: String): DcaeAppSimConfiguration {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> result.value
+ is Failure -> throw AssertionError("Parsing result should be present")
+ }
+ }
+
+ fun parseExpectingFailure(vararg cmdLine: String): Throwable {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> throw AssertionError("parsing should have failed")
+ is Failure -> result.exception
+ }
+ }
describe("parsing arguments") {
lateinit var result: DcaeAppSimConfiguration
@@ -46,7 +61,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("all parameters are present in the long form") {
beforeEachTest {
- result = parse("--listen-port", "6969",
+ result = parseExpectingSuccess("--listen-port", "6969",
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--kafka-topics", kafkaTopics
)
@@ -71,7 +86,9 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("some parameters are present in the short form") {
beforeEachTest {
- result = parse("-p", "666", "--kafka-bootstrap-servers", kafkaBootstrapServers, "-f", kafkaTopics)
+ result = parseExpectingSuccess("-p", "666",
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "-f", kafkaTopics)
}
it("should set proper port") {
@@ -92,7 +109,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("all optional parameters are absent") {
beforeEachTest {
- result = parse("-s", kafkaBootstrapServers, "-f", kafkaTopics)
+ result = parseExpectingSuccess("-s", kafkaBootstrapServers, "-f", kafkaTopics)
}
it("should set default port") {
@@ -100,21 +117,20 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
}
}
-
describe("required parameter is absent") {
given("kafka topics are missing") {
it("should throw exception") {
- assertFailsWith<WrongArgumentException> { parse("-s", kafkaBootstrapServers) }
+ assertThat(parseExpectingFailure("-s", kafkaBootstrapServers))
+ .isInstanceOf(WrongArgumentException::class.java)
}
}
given("kafka bootstrap servers are missing") {
it("should throw exception") {
- assertFailsWith<WrongArgumentException> { parse("-f", kafkaTopics) }
+ assertThat(parseExpectingFailure("-f", kafkaTopics))
+ .isInstanceOf(WrongArgumentException::class.java)
}
}
}
}
-
-
}) \ No newline at end of file