aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt74
1 files changed, 42 insertions, 32 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 36f30e66..5d9a7cfc 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -19,44 +19,44 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.getOrElse
import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monadError.monadError
-import arrow.instances.option.foldable.fold
import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
import java.io.InputStream
import javax.json.Json
class MessageStreamValidation(
- private val messageGenerator: MessageGenerator,
+ private val messageGenerator: VesEventGenerator,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
val messageParams = parseMessageParams(jsonDescription)
logger.debug { "Parsed message parameters: $messageParams" }
+
val expectedEvents = generateEvents(messageParams).bind()
val actualEvents = decodeConsumedEvents(consumedMessages)
- if (shouldValidatePayloads(messageParams)) {
+
+ if (shouldValidatePayloads(messageParams))
expectedEvents == actualEvents
- } else {
+ else
validateHeaders(actualEvents, expectedEvents)
- }
+
}.fix()
- private fun parseMessageParams(input: InputStream): List<MessageParameters> {
- val expectations = Json.createReader(input).readArray()
- val messageParams = messageParametersParser.parse(expectations)
+ private fun parseMessageParams(input: InputStream): List<VesEventParameters> {
+ val paramsArray = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(paramsArray)
return messageParams.fold(
{
@@ -65,36 +65,46 @@ class MessageStreamValidation(
throw IllegalArgumentException("Parsing error: " + it.message)
},
{
- if (it.isEmpty()) {
- val message = "Message param list cannot be empty"
- logger.warn { message }
- throw IllegalArgumentException(message)
- }
- it
+ toVesEventParams(it)
}
)
}
- private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
- parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+ private fun toVesEventParams(params: List<MessageParameters>): List<VesEventParameters> =
+ if (params.isEmpty()) {
+ val message = "Message param list cannot be empty"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params.map(::validateMessageParams)
+
+
+ private fun validateMessageParams(params: MessageParameters): VesEventParameters =
+ if (params !is VesEventParameters) {
+ val message = "Only VesEvent-related message types can be validated. " +
+ "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params
+
+
+ private fun shouldValidatePayloads(parameters: List<VesEventParameters>) =
+ parameters.all { it.messageType == FIXED_PAYLOAD }
- private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
- expected: List<VesEventOuterClass.VesEvent>): Boolean {
+ private fun validateHeaders(actual: List<VesEvent>,
+ expected: List<VesEvent>): Boolean {
val consumedHeaders = actual.map { it.commonEventHeader }
val generatedHeaders = expected.map { it.commonEventHeader }
return generatedHeaders == consumedHeaders
}
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
- messageGenerator.createMessageFlux(parameters)
- .map(WireFrameMessage::payload)
- .map(ByteData::unsafeAsArray)
- .map(VesEventOuterClass.VesEvent::parseFrom)
- .collectList()
- .asIo()
+ private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux
+ .fromIterable(parameters)
+ .flatMap { messageGenerator.createMessageFlux(it) }
+ .collectList()
+ .asIo()
private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
+ consumedMessages.map(VesEvent::parseFrom)
companion object {
private val logger = Logger(MessageStreamValidation::class)