diff options
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt')
-rw-r--r-- | sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt | 74 |
1 files changed, 42 insertions, 32 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 36f30e66..5d9a7cfc 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -19,44 +19,44 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.getOrElse import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monadError.monadError -import arrow.instances.option.foldable.fold import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventOuterClass +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageGenerator: MessageGenerator, + private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = IO.monadError().bindingCatch { val messageParams = parseMessageParams(jsonDescription) logger.debug { "Parsed message parameters: $messageParams" } + val expectedEvents = generateEvents(messageParams).bind() val actualEvents = decodeConsumedEvents(consumedMessages) - if (shouldValidatePayloads(messageParams)) { + + if (shouldValidatePayloads(messageParams)) expectedEvents == actualEvents - } else { + else validateHeaders(actualEvents, expectedEvents) - } + }.fix() - private fun parseMessageParams(input: InputStream): List<MessageParameters> { - val expectations = Json.createReader(input).readArray() - val messageParams = messageParametersParser.parse(expectations) + private fun parseMessageParams(input: InputStream): List<VesEventParameters> { + val paramsArray = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(paramsArray) return messageParams.fold( { @@ -65,36 +65,46 @@ class MessageStreamValidation( throw IllegalArgumentException("Parsing error: " + it.message) }, { - if (it.isEmpty()) { - val message = "Message param list cannot be empty" - logger.warn { message } - throw IllegalArgumentException(message) - } - it + toVesEventParams(it) } ) } - private fun shouldValidatePayloads(parameters: List<MessageParameters>) = - parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + private fun toVesEventParams(params: List<MessageParameters>): List<VesEventParameters> = + if (params.isEmpty()) { + val message = "Message param list cannot be empty" + logger.warn { message } + throw IllegalArgumentException(message) + } else params.map(::validateMessageParams) + + + private fun validateMessageParams(params: MessageParameters): VesEventParameters = + if (params !is VesEventParameters) { + val message = "Only VesEvent-related message types can be validated. " + + "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD" + logger.warn { message } + throw IllegalArgumentException(message) + } else params + + + private fun shouldValidatePayloads(parameters: List<VesEventParameters>) = + parameters.all { it.messageType == FIXED_PAYLOAD } - private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, - expected: List<VesEventOuterClass.VesEvent>): Boolean { + private fun validateHeaders(actual: List<VesEvent>, + expected: List<VesEvent>): Boolean { val consumedHeaders = actual.map { it.commonEventHeader } val generatedHeaders = expected.map { it.commonEventHeader } return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = - messageGenerator.createMessageFlux(parameters) - .map(WireFrameMessage::payload) - .map(ByteData::unsafeAsArray) - .map(VesEventOuterClass.VesEvent::parseFrom) - .collectList() - .asIo() + private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux + .fromIterable(parameters) + .flatMap { messageGenerator.createMessageFlux(it) } + .collectList() + .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = - consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) + consumedMessages.map(VesEvent::parseFrom) companion object { private val logger = Logger(MessageStreamValidation::class) |