From df17f466577b97a12fac39b64b5d113f32b82f2e Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Mon, 4 Feb 2019 15:20:14 +0100 Subject: Generate VesEvents in hv-ves/message-generator - Split message generator on two specialized generators for VesEvent and WireFrame related message types - Refactor whole message-generator module Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1162 --- .../dcaeapp/impl/MessageStreamValidation.kt | 74 ++++++++++++---------- .../collectors/veshv/simulators/dcaeapp/main.kt | 10 +-- .../dcaeapp/impl/MessageStreamValidationTest.kt | 55 +++++++--------- 3 files changed, 69 insertions(+), 70 deletions(-) (limited to 'sources/hv-collector-dcae-app-simulator') diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 36f30e66..5d9a7cfc 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -19,44 +19,44 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.getOrElse import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monadError.monadError -import arrow.instances.option.foldable.fold import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventOuterClass +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageGenerator: MessageGenerator, + private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List): IO = IO.monadError().bindingCatch { val messageParams = parseMessageParams(jsonDescription) logger.debug { "Parsed message parameters: $messageParams" } + val expectedEvents = generateEvents(messageParams).bind() val actualEvents = decodeConsumedEvents(consumedMessages) - if (shouldValidatePayloads(messageParams)) { + + if (shouldValidatePayloads(messageParams)) expectedEvents == actualEvents - } else { + else validateHeaders(actualEvents, expectedEvents) - } + }.fix() - private fun parseMessageParams(input: InputStream): List { - val expectations = Json.createReader(input).readArray() - val messageParams = messageParametersParser.parse(expectations) + private fun parseMessageParams(input: InputStream): List { + val paramsArray = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(paramsArray) return messageParams.fold( { @@ -65,36 +65,46 @@ class MessageStreamValidation( throw IllegalArgumentException("Parsing error: " + it.message) }, { - if (it.isEmpty()) { - val message = "Message param list cannot be empty" - logger.warn { message } - throw IllegalArgumentException(message) - } - it + toVesEventParams(it) } ) } - private fun shouldValidatePayloads(parameters: List) = - parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + private fun toVesEventParams(params: List): List = + if (params.isEmpty()) { + val message = "Message param list cannot be empty" + logger.warn { message } + throw IllegalArgumentException(message) + } else params.map(::validateMessageParams) + + + private fun validateMessageParams(params: MessageParameters): VesEventParameters = + if (params !is VesEventParameters) { + val message = "Only VesEvent-related message types can be validated. " + + "Correct values are: VALID, TOO_BIG_PAYLOAD, FIXED_PAYLOAD" + logger.warn { message } + throw IllegalArgumentException(message) + } else params + + + private fun shouldValidatePayloads(parameters: List) = + parameters.all { it.messageType == FIXED_PAYLOAD } - private fun validateHeaders(actual: List, - expected: List): Boolean { + private fun validateHeaders(actual: List, + expected: List): Boolean { val consumedHeaders = actual.map { it.commonEventHeader } val generatedHeaders = expected.map { it.commonEventHeader } return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List): IO> = - messageGenerator.createMessageFlux(parameters) - .map(WireFrameMessage::payload) - .map(ByteData::unsafeAsArray) - .map(VesEventOuterClass.VesEvent::parseFrom) - .collectList() - .asIo() + private fun generateEvents(parameters: List): IO> = Flux + .fromIterable(parameters) + .flatMap { messageGenerator.createMessageFlux(it) } + .collectList() + .asIo() private fun decodeConsumedEvents(consumedMessages: List) = - consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) + consumedMessages.map(VesEvent::parseFrom) companion object { private val logger = Logger(MessageStreamValidation::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index abf60b0d..8fba364d 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,15 +20,15 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure -import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit +import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory @@ -51,11 +51,11 @@ fun main(args: Array) = } ) - private fun startApp(config: DcaeAppSimConfiguration): IO { logger.info { "Using configuration: $config" } val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) - val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) + val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiAddress, config.kafkaTopics) .unit() diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index a631be76..8fb1b2ef 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -19,23 +19,22 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.Either import arrow.core.Right import com.google.protobuf.ByteString import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.mockito.ArgumentMatchers.anyList -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux @@ -47,7 +46,7 @@ import javax.json.stream.JsonParsingException */ internal class MessageStreamValidationTest : Spek({ lateinit var messageParametersParser: MessageParametersParser - lateinit var messageGenerator: MessageGenerator + lateinit var messageGenerator: VesEventGenerator lateinit var cut: MessageStreamValidation beforeEachTest { @@ -67,10 +66,7 @@ internal class MessageStreamValidationTest : Spek({ val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() // then - when(result) { - is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java) - else -> fail("validation should fail") - } + result.assertFailedWithError { it is JsonParsingException } } it("should return error when message param list is empty") { @@ -81,7 +77,7 @@ internal class MessageStreamValidationTest : Spek({ val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() // then - assertThat(result.isLeft()).isTrue() + result.assertFailedWithError { it is IllegalArgumentException } } describe("when validating headers only") { @@ -89,11 +85,10 @@ internal class MessageStreamValidationTest : Spek({ // given val jsonAsStream = sampleJsonAsStream() val event = vesEvent() - val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) val receivedMessageBytes = event.toByteArray() - givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, VALID, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -107,11 +102,11 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent(payload = "payload A") val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) + val receivedMessageBytes = receivedEvent.toByteArray() - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -125,11 +120,10 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent() val receivedEvent = vesEvent(eventId = "bbb") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -144,11 +138,10 @@ internal class MessageStreamValidationTest : Spek({ // given val jsonAsStream = sampleJsonAsStream() val event = vesEvent() - val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) val receivedMessageBytes = event.toByteArray() - givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -162,11 +155,10 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent(payload = "payload A") val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -180,11 +172,10 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent() val receivedEvent = vesEvent("bbb") - val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() - givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) - whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame)) + givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1)) + whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) // when val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() @@ -197,9 +188,9 @@ internal class MessageStreamValidationTest : Spek({ }) - private const val DUMMY_EVENT_ID = "aaa" private const val DUMMY_PAYLOAD = "payload" +private const val sampleJsonArray = """["headersOnly"]""" private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent { return VesEvent.newBuilder() @@ -209,6 +200,4 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P .build() } -private const val sampleJsonArray = """["headersOnly"]""" - private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream() -- cgit 1.2.3-korg