diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-04 15:20:14 +0100 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-15 15:09:48 +0100 |
commit | df17f466577b97a12fac39b64b5d113f32b82f2e (patch) | |
tree | 0a8999e593c90f97ed1b4f45b6e8adbbc110a787 /sources/hv-collector-dcae-app-simulator/src/main/kotlin | |
parent | e7204cbcf6af61856330cffc541b6f5c78476a09 (diff) |
Generate VesEvents in hv-ves/message-generator
- Split message generator on two specialized generators
for VesEvent and WireFrame related message types
- Refactor whole message-generator module
Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1162
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin')
2 files changed, 47 insertions, 37 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 36f30e66..5d9a7cfc 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -19,44 +19,44 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.getOrElse import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monadError.monadError -import arrow.instances.option.foldable.fold import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventOuterClass +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageGenerator: MessageGenerator, + private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = IO.monadError().bindingCatch { val messageParams = parseMessageParams(jsonDescription) logger.debug { "Parsed message parameters: $messageParams" } + val expectedEvents = generateEvents(messageParams).bind() val actualEvents = decodeConsumedEvents(consumedMessages) - if (shouldValidatePayloads(messageParams)) { + + if (shouldValidatePayloads(messageParams)) expectedEvents == actualEvents - } else { + else validateHeaders(actualEvents, expectedEvents) - } + }.fix() - private fun parseMessageParams(input: InputStream): List<MessageParameters> { - val expectations = Json.createReader(input).readArray() - val messageParams = messageParametersParser.parse(expectations) + private fun parseMessageParams(input: InputStream): List<VesEventParameters> { + val paramsArray = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(paramsArray) return messageParams.fold( { @@ -65,36 +65,46 @@ class MessageStreamValidation( throw IllegalArgumentException("Parsing error: " + it.message) }, { - if (it.isEmpty()) { - val message = "Message param list cannot be empty" - logger.warn { message } - throw IllegalArgumentException(message) - } - it + toVesEventParams(it) } ) } - private fun shouldValidatePayloads(parameters: List<MessageParameters>) = - parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + private fun toVesEventParams(params: List<MessageParameters>): List<VesEventParameters> = + if (params.isEmpty()) { + val message = "Message param list cannot be empty" + logger.warn { message } + throw IllegalArgumentException(message) + } else params.map(::validateMessageParams) + + + private fun validateMessageParams(params: MessageParameters): VesEventParameters = + if (params !is VesEventParameters) { + val message = "Only VesEvent-related message types can be validated. " + + "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD" + logger.warn { message } + throw IllegalArgumentException(message) + } else params + + + private fun shouldValidatePayloads(parameters: List<VesEventParameters>) = + parameters.all { it.messageType == FIXED_PAYLOAD } - private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, - expected: List<VesEventOuterClass.VesEvent>): Boolean { + private fun validateHeaders(actual: List<VesEvent>, + expected: List<VesEvent>): Boolean { val consumedHeaders = actual.map { it.commonEventHeader } val generatedHeaders = expected.map { it.commonEventHeader } return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = - messageGenerator.createMessageFlux(parameters) - .map(WireFrameMessage::payload) - .map(ByteData::unsafeAsArray) - .map(VesEventOuterClass.VesEvent::parseFrom) - .collectList() - .asIo() + private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux + .fromIterable(parameters) + .flatMap { messageGenerator.createMessageFlux(it) } + .collectList() + .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = - consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) + consumedMessages.map(VesEvent::parseFrom) companion object { private val logger = Logger(MessageStreamValidation::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index abf60b0d..8fba364d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,15 +20,15 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit +import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -51,11 +51,11 @@ fun main(args: Array<String>) = } ) - private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { logger.info { "Using configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) - val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) + val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiAddress, config.kafkaTopics) .unit() |