diff options
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt')
-rw-r--r-- | sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt | 40 |
1 files changed, 18 insertions, 22 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 47a2d22a..144aab02 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser @@ -32,6 +27,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIX import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.InputStream import javax.json.Json @@ -39,20 +35,21 @@ class MessageStreamValidation( private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = - IO.monadError().bindingCatch { - val messageParams = parseMessageParams(jsonDescription) - logger.debug { "Parsed message parameters: $messageParams" } - - val expectedEvents = generateEvents(messageParams).bind() - val actualEvents = decodeConsumedEvents(consumedMessages) - - if (shouldValidatePayloads(messageParams)) - expectedEvents == actualEvents - else - validateHeaders(actualEvents, expectedEvents) - - }.fix() + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>) = + Mono + .fromSupplier { parseMessageParams(jsonDescription) } + .doOnNext { + logger.debug { "Parsed message parameters: $it" } + } + .flatMap { messageParams -> + val actualEvents = decodeConsumedEvents(consumedMessages) + generateEvents(messageParams).map { + if (shouldValidatePayloads(messageParams)) + it == actualEvents + else + validateHeaders(actualEvents, it) + } + } private fun parseMessageParams(input: InputStream): List<VesEventParameters> { val paramsArray = Json.createReader(input).readArray() @@ -97,11 +94,10 @@ class MessageStreamValidation( return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux + private fun generateEvents(parameters: List<VesEventParameters>): Mono<List<VesEvent>> = Flux .fromIterable(parameters) .flatMap { messageGenerator.createMessageFlux(it) } .collectList() - .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = consumedMessages.map(VesEvent::parseFrom) |