diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main')
3 files changed, 111 insertions, 79 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt index 264033e3..065cdf92 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt @@ -28,7 +28,9 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { override val cmdLineOptionsList: List<CommandLineOption> = listOf( diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 869c5ab6..08bb149f 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -19,17 +19,24 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka -import arrow.core.Option import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){ + val messagesCount: Int by lazy { + messages.size + } -class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>) + val consumedMessages: List<ByteArray> by lazy { + messages.toList() + } +} interface ConsumerStateProvider { fun currentState(): ConsumerState @@ -37,31 +44,21 @@ interface ConsumerStateProvider { } class Consumer : ConsumerStateProvider { - private var msgCount = 0L - private var lastKey: ByteArray? = null - private var lastValue: ByteArray? = null - override fun currentState() = - ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue)) + private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() - override fun reset() = IO { - synchronized(this) { - msgCount = 0 - lastKey = null - lastValue = null - } + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO<Unit> = IO { + consumedMessages.clear() } fun update(record: ReceiverRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } - - synchronized(this) { - msgCount++ - lastKey = record.key() - lastValue = record.value() - } + consumedMessages.add(record.value()) } + companion object { private val logger = Logger(Consumer::class) } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index d1d90b00..cb1484b7 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -19,28 +19,31 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote -import arrow.core.Try -import arrow.core.getOrElse import arrow.effects.IO -import com.google.protobuf.MessageOrBuilder -import com.google.protobuf.util.JsonFormat +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields +import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain +import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import reactor.core.publisher.Mono +import javax.json.Json /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class ApiServer(private val consumerFactory: ConsumerFactory) { +class ApiServer(private val consumerFactory: ConsumerFactory, + private val messageParametersParser: MessageParametersParser = MessageParametersParser()) { private lateinit var consumerState: ConsumerStateProvider - private val jsonPrinter = JsonFormat.printer() fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO { consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) @@ -57,60 +60,92 @@ class ApiServer(private val consumerFactory: ConsumerFactory) { val topics = extractTopics(it.text) logger.info("Received new configuration. Creating consumer for topics: $topics") consumerState = consumerFactory.createConsumerForTopics(topics) - ctx.response.contentType(CONTENT_TEXT) - ctx.response.send("OK") + ctx.response + .status(STATUS_OK) + .send() } } - - .get("messages/count") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - val state = consumerState.currentState() - ctx.response.send(state.msgCount.toString()) - } - - .get("messages/last/key") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastKey - .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - - .get("messages/last/value") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastValue - .map { Try { VesEvent.parseFrom(it) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - - .get("messages/last/hvRanMeasFields") { ctx -> - ctx.response.contentType(CONTENT_JSON) - val state = consumerState.currentState() - val resp = state.lastValue - .flatMap { Try { VesEvent.parseFrom(it) }.toOption() } - .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS } - .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } } - .map(this::protobufToJson) - .getOrElse { "null" } - ctx.response.send(resp) - } - .delete("messages") { ctx -> ctx.response.contentType(CONTENT_TEXT) consumerState.reset() .unsafeRunAsync { it.fold( - { ctx.response.send("NOK") }, - { ctx.response.send("OK") } - ) + { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) }, + { ctx.response.status(STATUS_OK) } + ).send() } } + .get("messages/all/count") { ctx -> + val state = consumerState.currentState() + ctx.response + .contentType(CONTENT_TEXT) + .send(state.messagesCount.toString()) + } + .post("messages/all/validate") { ctx -> + ctx.request.body + .map { Json.createReader(it.inputStream).readArray() } + .map { messageParametersParser.parse(it) } + .map { generateEvents(ctx, it) } + .then { (generatedEvents, shouldValidatePayloads) -> + generatedEvents + .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) } + .block() + } + } + } + + private fun generateEvents(ctx: Context, parameters: List<MessageParameters>): + Pair<Mono<List<VesEvent>>, Boolean> = Pair( + + doGenerateEvents(parameters).doOnError { + logger.error("Error occurred when generating messages: $it") + ctx.response + .status(STATUS_INTERNAL_SERVER_ERROR) + .send() + }, + parameters.all { it.messageType == FIXED_PAYLOAD } + ) + + private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE + .createMessageFlux(parameters) + .map(PayloadWireFrameMessage::payload) + .map { decode(it.unsafeAsArray()) } + .collectList() + + + private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes) + + + private fun sendResponse(ctx: Context, + generatedEvents: List<VesEvent>, + shouldValidatePayloads: Boolean) = + resolveResponseStatusCode( + generated = generatedEvents, + consumed = decodeConsumedEvents(), + validatePayloads = shouldValidatePayloads + ).let { ctx.response.status(it).send() } + + + private fun decodeConsumedEvents(): List<VesEvent> = consumerState + .currentState() + .consumedMessages + .map(::decode) + + + private fun resolveResponseStatusCode(generated: List<VesEvent>, + consumed: List<VesEvent>, + validatePayloads: Boolean): Int = + if (validatePayloads) { + if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST + } else { + validateHeaders(consumed, generated) + } + + private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int { + val consumedHeaders = consumed.map { it.commonEventHeader } + val generatedHeaders = generated.map { it.commonEventHeader } + return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST } private fun extractTopics(it: String): Set<String> = @@ -118,16 +153,14 @@ class ApiServer(private val consumerFactory: ConsumerFactory) { .split(",") .toSet() - private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String = - parseResult.fold( - { ex -> "\"Failed to parse protobuf: ${ex.message}\"" }, - { jsonPrinter.print(it) }) - - companion object { private val logger = Logger(ApiServer::class) - private const val CONTENT_TEXT = "text/plain" - private const val CONTENT_JSON = "application/json" + + private const val STATUS_OK = 200 + private const val STATUS_BAD_REQUEST = 400 + private const val STATUS_INTERNAL_SERVER_ERROR = 500 } } + + |