aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt4
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt35
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt151
3 files changed, 111 insertions, 79 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
index 264033e3..065cdf92 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
@@ -28,7 +28,9 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 869c5ab6..08bb149f 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -19,17 +19,24 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
-import arrow.core.Option
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
+import java.util.concurrent.ConcurrentLinkedQueue
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+ val messagesCount: Int by lazy {
+ messages.size
+ }
-class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
+ val consumedMessages: List<ByteArray> by lazy {
+ messages.toList()
+ }
+}
interface ConsumerStateProvider {
fun currentState(): ConsumerState
@@ -37,31 +44,21 @@ interface ConsumerStateProvider {
}
class Consumer : ConsumerStateProvider {
- private var msgCount = 0L
- private var lastKey: ByteArray? = null
- private var lastValue: ByteArray? = null
- override fun currentState() =
- ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+ private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
- override fun reset() = IO {
- synchronized(this) {
- msgCount = 0
- lastKey = null
- lastValue = null
- }
+ override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
+
+ override fun reset(): IO<Unit> = IO {
+ consumedMessages.clear()
}
fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
-
- synchronized(this) {
- msgCount++
- lastKey = record.key()
- lastValue = record.value()
- }
+ consumedMessages.add(record.value())
}
+
companion object {
private val logger = Logger(Consumer::class)
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index d1d90b00..cb1484b7 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,28 +19,31 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import arrow.core.Try
-import arrow.core.getOrElse
import arrow.effects.IO
-import com.google.protobuf.MessageOrBuilder
-import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
+import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
+import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+import javax.json.Json
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val consumerFactory: ConsumerFactory) {
+class ApiServer(private val consumerFactory: ConsumerFactory,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
private lateinit var consumerState: ConsumerStateProvider
- private val jsonPrinter = JsonFormat.printer()
fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
@@ -57,60 +60,92 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
val topics = extractTopics(it.text)
logger.info("Received new configuration. Creating consumer for topics: $topics")
consumerState = consumerFactory.createConsumerForTopics(topics)
- ctx.response.contentType(CONTENT_TEXT)
- ctx.response.send("OK")
+ ctx.response
+ .status(STATUS_OK)
+ .send()
}
}
-
- .get("messages/count") { ctx ->
- ctx.response.contentType(CONTENT_TEXT)
- val state = consumerState.currentState()
- ctx.response.send(state.msgCount.toString())
- }
-
- .get("messages/last/key") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastKey
- .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
- .get("messages/last/value") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastValue
- .map { Try { VesEvent.parseFrom(it) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
- .get("messages/last/hvRanMeasFields") { ctx ->
- ctx.response.contentType(CONTENT_JSON)
- val state = consumerState.currentState()
- val resp = state.lastValue
- .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
- .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
- .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
- .map(this::protobufToJson)
- .getOrElse { "null" }
- ctx.response.send(resp)
- }
-
.delete("messages") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
consumerState.reset()
.unsafeRunAsync {
it.fold(
- { ctx.response.send("NOK") },
- { ctx.response.send("OK") }
- )
+ { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
+ { ctx.response.status(STATUS_OK) }
+ ).send()
}
}
+ .get("messages/all/count") { ctx ->
+ val state = consumerState.currentState()
+ ctx.response
+ .contentType(CONTENT_TEXT)
+ .send(state.messagesCount.toString())
+ }
+ .post("messages/all/validate") { ctx ->
+ ctx.request.body
+ .map { Json.createReader(it.inputStream).readArray() }
+ .map { messageParametersParser.parse(it) }
+ .map { generateEvents(ctx, it) }
+ .then { (generatedEvents, shouldValidatePayloads) ->
+ generatedEvents
+ .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
+ .block()
+ }
+ }
+ }
+
+ private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
+ Pair<Mono<List<VesEvent>>, Boolean> = Pair(
+
+ doGenerateEvents(parameters).doOnError {
+ logger.error("Error occurred when generating messages: $it")
+ ctx.response
+ .status(STATUS_INTERNAL_SERVER_ERROR)
+ .send()
+ },
+ parameters.all { it.messageType == FIXED_PAYLOAD }
+ )
+
+ private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
+ .createMessageFlux(parameters)
+ .map(PayloadWireFrameMessage::payload)
+ .map { decode(it.unsafeAsArray()) }
+ .collectList()
+
+
+ private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
+
+
+ private fun sendResponse(ctx: Context,
+ generatedEvents: List<VesEvent>,
+ shouldValidatePayloads: Boolean) =
+ resolveResponseStatusCode(
+ generated = generatedEvents,
+ consumed = decodeConsumedEvents(),
+ validatePayloads = shouldValidatePayloads
+ ).let { ctx.response.status(it).send() }
+
+
+ private fun decodeConsumedEvents(): List<VesEvent> = consumerState
+ .currentState()
+ .consumedMessages
+ .map(::decode)
+
+
+ private fun resolveResponseStatusCode(generated: List<VesEvent>,
+ consumed: List<VesEvent>,
+ validatePayloads: Boolean): Int =
+ if (validatePayloads) {
+ if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
+ } else {
+ validateHeaders(consumed, generated)
+ }
+
+ private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
+ val consumedHeaders = consumed.map { it.commonEventHeader }
+ val generatedHeaders = generated.map { it.commonEventHeader }
+ return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
}
private fun extractTopics(it: String): Set<String> =
@@ -118,16 +153,14 @@ class ApiServer(private val consumerFactory: ConsumerFactory) {
.split(",")
.toSet()
- private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
- parseResult.fold(
- { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
- { jsonPrinter.print(it) })
-
-
companion object {
private val logger = Logger(ApiServer::class)
-
private const val CONTENT_TEXT = "text/plain"
- private const val CONTENT_JSON = "application/json"
+
+ private const val STATUS_OK = 200
+ private const val STATUS_BAD_REQUEST = 400
+ private const val STATUS_INTERNAL_SERVER_ERROR = 500
}
}
+
+