aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt74
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt10
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt55
3 files changed, 69 insertions, 70 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 36f30e66..5d9a7cfc 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -19,44 +19,44 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.getOrElse
import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monadError.monadError
-import arrow.instances.option.foldable.fold
import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventOuterClass
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
import java.io.InputStream
import javax.json.Json
class MessageStreamValidation(
- private val messageGenerator: MessageGenerator,
+ private val messageGenerator: VesEventGenerator,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
val messageParams = parseMessageParams(jsonDescription)
logger.debug { "Parsed message parameters: $messageParams" }
+
val expectedEvents = generateEvents(messageParams).bind()
val actualEvents = decodeConsumedEvents(consumedMessages)
- if (shouldValidatePayloads(messageParams)) {
+
+ if (shouldValidatePayloads(messageParams))
expectedEvents == actualEvents
- } else {
+ else
validateHeaders(actualEvents, expectedEvents)
- }
+
}.fix()
- private fun parseMessageParams(input: InputStream): List<MessageParameters> {
- val expectations = Json.createReader(input).readArray()
- val messageParams = messageParametersParser.parse(expectations)
+ private fun parseMessageParams(input: InputStream): List<VesEventParameters> {
+ val paramsArray = Json.createReader(input).readArray()
+ val messageParams = messageParametersParser.parse(paramsArray)
return messageParams.fold(
{
@@ -65,36 +65,46 @@ class MessageStreamValidation(
throw IllegalArgumentException("Parsing error: " + it.message)
},
{
- if (it.isEmpty()) {
- val message = "Message param list cannot be empty"
- logger.warn { message }
- throw IllegalArgumentException(message)
- }
- it
+ toVesEventParams(it)
}
)
}
- private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
- parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+ private fun toVesEventParams(params: List<MessageParameters>): List<VesEventParameters> =
+ if (params.isEmpty()) {
+ val message = "Message param list cannot be empty"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params.map(::validateMessageParams)
+
+
+ private fun validateMessageParams(params: MessageParameters): VesEventParameters =
+ if (params !is VesEventParameters) {
+ val message = "Only VesEvent-related message types can be validated. " +
+ "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD"
+ logger.warn { message }
+ throw IllegalArgumentException(message)
+ } else params
+
+
+ private fun shouldValidatePayloads(parameters: List<VesEventParameters>) =
+ parameters.all { it.messageType == FIXED_PAYLOAD }
- private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>,
- expected: List<VesEventOuterClass.VesEvent>): Boolean {
+ private fun validateHeaders(actual: List<VesEvent>,
+ expected: List<VesEvent>): Boolean {
val consumedHeaders = actual.map { it.commonEventHeader }
val generatedHeaders = expected.map { it.commonEventHeader }
return generatedHeaders == consumedHeaders
}
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
- messageGenerator.createMessageFlux(parameters)
- .map(WireFrameMessage::payload)
- .map(ByteData::unsafeAsArray)
- .map(VesEventOuterClass.VesEvent::parseFrom)
- .collectList()
- .asIo()
+ private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux
+ .fromIterable(parameters)
+ .flatMap { messageGenerator.createMessageFlux(it) }
+ .collectList()
+ .asIo()
private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
+ consumedMessages.map(VesEvent::parseFrom)
companion object {
private val logger = Logger(MessageStreamValidation::class)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index abf60b0d..8fba364d 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,15 +20,15 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
+import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
@@ -51,11 +51,11 @@ fun main(args: Array<String>) =
}
)
-
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
logger.info { "Using configuration: $config" }
val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
- val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes)
+ val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiAddress, config.kafkaTopics)
.unit()
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index a631be76..8fb1b2ef 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -19,23 +19,22 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.Either
import arrow.core.Right
import com.google.protobuf.ByteString
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.mockito.ArgumentMatchers.anyList
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
@@ -47,7 +46,7 @@ import javax.json.stream.JsonParsingException
*/
internal class MessageStreamValidationTest : Spek({
lateinit var messageParametersParser: MessageParametersParser
- lateinit var messageGenerator: MessageGenerator
+ lateinit var messageGenerator: VesEventGenerator
lateinit var cut: MessageStreamValidation
beforeEachTest {
@@ -67,10 +66,7 @@ internal class MessageStreamValidationTest : Spek({
val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
// then
- when(result) {
- is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
- else -> fail("validation should fail")
- }
+ result.assertFailedWithError { it is JsonParsingException }
}
it("should return error when message param list is empty") {
@@ -81,7 +77,7 @@ internal class MessageStreamValidationTest : Spek({
val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
// then
- assertThat(result.isLeft()).isTrue()
+ result.assertFailedWithError { it is IllegalArgumentException }
}
describe("when validating headers only") {
@@ -89,11 +85,10 @@ internal class MessageStreamValidationTest : Spek({
// given
val jsonAsStream = sampleJsonAsStream()
val event = vesEvent()
- val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
val receivedMessageBytes = event.toByteArray()
- givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, VALID, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -107,11 +102,11 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent(payload = "payload A")
val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
+
val receivedMessageBytes = receivedEvent.toByteArray()
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -125,11 +120,10 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent()
val receivedEvent = vesEvent(eventId = "bbb")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -144,11 +138,10 @@ internal class MessageStreamValidationTest : Spek({
// given
val jsonAsStream = sampleJsonAsStream()
val event = vesEvent()
- val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
val receivedMessageBytes = event.toByteArray()
- givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -162,11 +155,10 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent(payload = "payload A")
val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -180,11 +172,10 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent()
val receivedEvent = vesEvent("bbb")
- val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
- givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
- whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
+ whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
// when
val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
@@ -197,9 +188,9 @@ internal class MessageStreamValidationTest : Spek({
})
-
private const val DUMMY_EVENT_ID = "aaa"
private const val DUMMY_PAYLOAD = "payload"
+private const val sampleJsonArray = """["headersOnly"]"""
private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
return VesEvent.newBuilder()
@@ -209,6 +200,4 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
.build()
}
-private const val sampleJsonArray = """["headersOnly"]"""
-
private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()