aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt74
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt10
2 files changed, 47 insertions, 37 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 36f30e66..5d9a7cfc 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -19,44 +19,44 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.getOrElse
import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monadError.monadError
-import arrow.instances.option.foldable.fold
import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
import java.io.InputStream
import javax.json.Json
class MessageStreamValidation(
- private val messageGenerator: MessageGenerator,
+ private val messageGenerator: VesEventGenerator,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
val messageParams = parseMessageParams(jsonDescription)
logger.debug { "Parsed message parameters: $messageParams" }
+
val expectedEvents = generateEvents(messageParams).bind()
val actualEvents = decodeConsumedEvents(consumedMessages)
- if (shouldValidatePayloads(messageParams)) {
+
+ if (shouldValidatePayloads(messageParams))
expectedEvents == actualEvents
- } else {
+ else
validateHeaders(actualEvents, expectedEvents)
- }
+
}.fix()
- private fun parseMessageParams(input: InputStream): List<MessageParameters> {
- val expectations = Json.createReader(input).readArray()
- val messageParams = messageParametersParser.parse(expectations)
+ private fun parseMessageParams(input: InputStream): List<VesEventParameters> {
+ val paramsArray = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(paramsArray)
return messageParams.fold(
{
@@ -65,36 +65,46 @@ class MessageStreamValidation(
throw IllegalArgumentException("Parsing error: " + it.message)
},
{
- if (it.isEmpty()) {
- val message = "Message param list cannot be empty"
- logger.warn { message }
- throw IllegalArgumentException(message)
- }
- it
+ toVesEventParams(it)
}
)
}
- private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
- parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+ private fun toVesEventParams(params: List<MessageParameters>): List<VesEventParameters> =
+ if (params.isEmpty()) {
+ val message = "Message param list cannot be empty"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params.map(::validateMessageParams)
+
+
+ private fun validateMessageParams(params: MessageParameters): VesEventParameters =
+ if (params !is VesEventParameters) {
+ val message = "Only VesEvent-related message types can be validated. " +
+ "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params
+
+
+ private fun shouldValidatePayloads(parameters: List<VesEventParameters>) =
+ parameters.all { it.messageType == FIXED_PAYLOAD }
- private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
- expected: List<VesEventOuterClass.VesEvent>): Boolean {
+ private fun validateHeaders(actual: List<VesEvent>,
+ expected: List<VesEvent>): Boolean {
val consumedHeaders = actual.map { it.commonEventHeader }
val generatedHeaders = expected.map { it.commonEventHeader }
return generatedHeaders == consumedHeaders
}
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
- messageGenerator.createMessageFlux(parameters)
- .map(WireFrameMessage::payload)
- .map(ByteData::unsafeAsArray)
- .map(VesEventOuterClass.VesEvent::parseFrom)
- .collectList()
- .asIo()
+ private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux
+ .fromIterable(parameters)
+ .flatMap { messageGenerator.createMessageFlux(it) }
+ .collectList()
+ .asIo()
private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
+ consumedMessages.map(VesEvent::parseFrom)
companion object {
private val logger = Logger(MessageStreamValidation::class)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index abf60b0d..8fba364d 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,15 +20,15 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
+import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -51,11 +51,11 @@ fun main(args: Array<String>) =
}
)
-
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
logger.info { "Using configuration: $config" }
val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
- val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes)
+ val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiAddress, config.kafkaTopics)
.unit()