diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-29 07:03:19 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-03-29 07:03:19 +0000 |
commit | 4f5d73f18a7952379505a4995b6afe22acf31c11 (patch) | |
tree | 5d7d4ecf0a64c6668e2aca823bec675c868add62 | |
parent | 2174a045086e16611128b20a6d4357c04d9eac4a (diff) | |
parent | 75b95caba1ec7e126683be1df5928a380174d428 (diff) |
Merge "Remove IO monad usage from simulators"
10 files changed, 148 insertions, 179 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index f7d94de5..28866f36 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.getOrElse -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import java.io.InputStream +import java.lang.IllegalArgumentException import java.util.concurrent.atomic.AtomicReference /** @@ -39,7 +37,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) - fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { + fun listenToTopics(topics: Set<String>) { if (topics.isEmpty() || topics.any { it.isBlank() }) { val message = "Topic list cannot be empty or contain empty elements, topics: $topics" logger.info { message } @@ -47,17 +45,15 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Creating consumer for topics: $topics" } - consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) - }.fix() + consumerState.set(consumerFactory.createConsumerForTopics(topics)) + } fun state() = consumerState.getOption().map { it.currentState() } - fun resetState(): IO<Unit> = consumerState.getOption().fold( - { IO.unit }, - { it.reset() } - ) + fun resetState() = consumerState.getOption().fold({ }, { it.reset() }) + - fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages()) private fun currentMessages(): List<ByteArray> = consumerState.getOption() diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 47a2d22a..144aab02 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser @@ -32,6 +27,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIX import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.InputStream import javax.json.Json @@ -39,20 +35,21 @@ class MessageStreamValidation( private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = - IO.monadError().bindingCatch { - val messageParams = parseMessageParams(jsonDescription) - logger.debug { "Parsed message parameters: $messageParams" } - - val expectedEvents = generateEvents(messageParams).bind() - val actualEvents = decodeConsumedEvents(consumedMessages) - - if (shouldValidatePayloads(messageParams)) - expectedEvents == actualEvents - else - validateHeaders(actualEvents, expectedEvents) - - }.fix() + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>) = + Mono + .fromSupplier { parseMessageParams(jsonDescription) } + .doOnNext { + logger.debug { "Parsed message parameters: $it" } + } + .flatMap { messageParams -> + val actualEvents = decodeConsumedEvents(consumedMessages) + generateEvents(messageParams).map { + if (shouldValidatePayloads(messageParams)) + it == actualEvents + else + validateHeaders(actualEvents, it) + } + } private fun parseMessageParams(input: InputStream): List<VesEventParameters> { val paramsArray = Json.createReader(input).readArray() @@ -97,11 +94,10 @@ class MessageStreamValidation( return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux + private fun generateEvents(parameters: List<VesEventParameters>): Mono<List<VesEvent>> = Flux .fromIterable(parameters) .flatMap { messageGenerator.createMessageFlux(it) } .collectList() - .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = consumedMessages.map(VesEvent::parseFrom) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index d2c5b275..5d2977e4 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -50,9 +50,9 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { ) } - fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> = - simulator.listenToTopics(kafkaTopics).map { + IO { + simulator.listenToTopics(kafkaTopics) HttpServer.create() .host(socketAddress.hostName) .port(socketAddress.port) @@ -60,22 +60,21 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { .let { NettyServerHandle(it.bindNow()) } } - private fun setRoutes(route: HttpServerRoutes) { route .put("/configuration/topics") { req, res -> req .receive().aggregate().asString() .flatMap { - val option = simulator.listenToTopics(it) - res.sendOrError(option).then() + res.sendOrError{ simulator.listenToTopics(it) } } } .delete("/messages") { _, res -> logger.info { "Resetting simulator state" } + res .header("Content-type", CONTENT_TEXT) - .sendOrError(simulator.resetState()) + .sendOrError { simulator.resetState() } } .get("/messages/all/count") { _, res -> logger.info { "Processing request for count of received messages" } @@ -93,12 +92,13 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { .post("/messages/all/validate") { req, res -> req .receive().aggregate().asInputStream() - .flatMap { body -> + .map { logger.info { "Processing request for message validation" } - val response = - simulator.validate(body) - .map(::resolveValidationResponse) - res.sendAndHandleErrors(response).then() + simulator.validate(it) + .map(::resolveValidationResponse) + } + .flatMap { + res.sendAndHandleErrors(it) } } .get("/healthcheck") { _, res -> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 10dedbdf..7bab9676 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,11 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer -import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions @@ -34,11 +32,10 @@ import reactor.kafka.receiver.ReceiverOptions */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): IO<Consumer> = IO { - val consumer = Consumer() - receiver.receive().map(consumer::update).evaluateIo().subscribe() - consumer - } + fun start() = Consumer() + .also { consumer -> + receiver.receive().map(consumer::update) + } companion object { private val logger = Logger(KafkaSource::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 1eefdbdb..725248ce 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord @@ -41,7 +40,7 @@ class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { interface ConsumerStateProvider { fun currentState(): ConsumerState - fun reset(): IO<Unit> + fun reset() } class Consumer : ConsumerStateProvider { @@ -50,11 +49,9 @@ class Consumer : ConsumerStateProvider { override fun currentState(): ConsumerState = ConsumerState(consumedMessages) - override fun reset(): IO<Unit> = IO { - consumedMessages.clear() - } + override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { + fun update(record: ReceiverRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -65,6 +62,6 @@ class Consumer : ConsumerStateProvider { } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = + fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer = KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt index 08558d76..e8ac6cd5 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ internal class ConsumerTest : Spek({ topic = "topic", key = byteArrayOf(1), value = value - )).unsafeRunSync() + )) } it("should contain one message if it was updated once") { @@ -60,7 +60,7 @@ internal class ConsumerTest : Spek({ } it("should contain empty state message if it was reset after update") { - cut.reset().unsafeRunSync() + cut.reset() assertEmptyState(cut) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index e1641cbb..493100fc 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,8 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.Left import arrow.core.None import arrow.core.Some -import arrow.effects.IO import com.google.protobuf.ByteString import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq @@ -30,14 +28,18 @@ import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.never import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.* import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.mockito.ArgumentMatchers.anySet import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Mono +import reactor.test.StepVerifier +import java.lang.IllegalArgumentException import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.test.assertFailsWith /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -55,7 +57,7 @@ internal class DcaeAppSimulatorTest : Spek({ consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer)) + whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer) } fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) @@ -64,48 +66,36 @@ internal class DcaeAppSimulatorTest : Spek({ val topics = setOf("perf3gpp", "faults") it("should fail when topic list is empty") { - val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync() - assertThat(result.isLeft()).isTrue() + assertFailsWith(IllegalArgumentException::class){ + cut.listenToTopics(setOf()) + } } it("should fail when topic list contains empty strings") { - val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync() - assertThat(result.isLeft()).isTrue() + assertFailsWith(IllegalArgumentException::class){ + cut.listenToTopics(setOf("perf3gpp", " ", "faults")) + } } it("should subscribe to given topics") { - cut.listenToTopics(topics).unsafeRunSync() + cut.listenToTopics(topics) verify(consumerFactory).createConsumerForTopics(topics) } it("should subscribe to given topics when called with comma separated list") { - cut.listenToTopics("perf3gpp,faults").unsafeRunSync() + cut.listenToTopics("perf3gpp,faults") verify(consumerFactory).createConsumerForTopics(topics) } - - it("should handle errors") { - // given - val error = RuntimeException("WTF") - whenever(consumerFactory.createConsumerForTopics(anySet())) - .thenReturn(IO.raiseError(error)) - - // when - val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync() - - // then - assertThat(result).isEqualTo(Left(error)) - } } describe("state") { - it("should return None when topics hasn't been initialized") { assertThat(cut.state()).isEqualTo(None) } describe("when topics are initialized") { beforeEachTest { - cut.listenToTopics("perf3gpp").unsafeRunSync() + cut.listenToTopics("perf3gpp") } it("should return some state when it has been set") { @@ -119,21 +109,18 @@ internal class DcaeAppSimulatorTest : Spek({ describe("resetState") { it("should do nothing when topics hasn't been initialized") { - cut.resetState().unsafeRunSync() + cut.resetState() verify(consumer, never()).reset() } describe("when topics are initialized") { beforeEachTest { - cut.listenToTopics("perf3gpp").unsafeRunSync() + cut.listenToTopics("perf3gpp") } it("should reset the state") { - // given - whenever(consumer.reset()).thenReturn(IO.unit) - // when - cut.resetState().unsafeRunSync() + cut.resetState() // then verify(consumer).reset() @@ -143,29 +130,30 @@ internal class DcaeAppSimulatorTest : Spek({ describe("validate") { beforeEachTest { - whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true)) + whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true)) } it("should use empty list when consumer is unavailable") { - // when - val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + StepVerifier + .create(cut.validate("['The JSON']".byteInputStream())) + .expectNext(true) + .verifyComplete() - // then verify(messageStreamValidation).validate(any(), eq(emptyList())) - assertThat(result).isTrue() } it("should delegate to MessageStreamValidation") { // given - cut.listenToTopics("perf3gpp").unsafeRunSync() + cut.listenToTopics("perf3gpp") whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) - // when - val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync() + StepVerifier + .create(cut.validate("['The JSON']".byteInputStream())) + .expectNext(true) + .verifyComplete() // then verify(messageStreamValidation).validate(any(), any()) - assertThat(result).isTrue() } } }) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index bff7709d..88867999 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,9 @@ import com.google.protobuf.ByteString import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters @@ -38,6 +36,8 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventG import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux +import reactor.test.StepVerifier +import java.lang.IllegalArgumentException import javax.json.stream.JsonParsingException /** @@ -60,24 +60,22 @@ internal class MessageStreamValidationTest : Spek({ } describe("validate") { - it("should return error when JSON is invalid") { - // when - val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync() - - // then - result.assertFailedWithError { it is JsonParsingException } + StepVerifier + .create(cut.validate("[{invalid json}]".byteInputStream(), listOf())) + .expectError(JsonParsingException::class.java) + .verify() } it("should return error when message param list is empty") { - // given - givenParsedMessageParameters() - - // when - val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync() + // given + givenParsedMessageParameters() - // then - result.assertFailedWithError { it is IllegalArgumentException } + //when + StepVerifier + .create(cut.validate(sampleJsonAsStream(), listOf())) + .expectError(IllegalArgumentException::class.java) + .verify() } describe("when validating headers only") { @@ -90,11 +88,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, VALID, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(true) + .verifyComplete() } it("should return true when messages differ with payload only") { @@ -108,11 +105,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(true) + .verifyComplete() } it("should return false when messages are different") { @@ -125,11 +121,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(false) + .verifyComplete() } } @@ -143,11 +138,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, FIXED_PAYLOAD, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isTrue() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(true) + .verifyComplete() } it("should return false when messages differ with payload only") { @@ -160,11 +154,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(false) + .verifyComplete() } it("should return false when messages are different") { @@ -177,11 +170,10 @@ internal class MessageStreamValidationTest : Spek({ givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1)) whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent)) - // when - val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync() - - // then - assertThat(result).isFalse() + StepVerifier + .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes))) + .expectNext(false) + .verifyComplete() } } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt index cf338a70..f133d630 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt @@ -20,50 +20,53 @@ package org.onap.dcae.collectors.veshv.utils.http import arrow.core.Either -import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono +import reactor.core.publisher.toMono import reactor.netty.NettyOutbound import reactor.netty.http.server.HttpServerResponse import javax.json.Json private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty") -fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound = - sendAndHandleErrors(action.map { - Response( - HttpStatus.OK, - Content( - ContentType.JSON, - Json.createObjectBuilder().add("response", "Request accepted").build() +fun HttpServerResponse.sendOrError(action: ()->Unit) = sendAndHandleErrors( + Mono + .fromSupplier(action) + .map { + Response( + HttpStatus.OK, + Content( + ContentType.JSON, + Json.createObjectBuilder().add("response", "Request accepted").build() + ) ) - ) - }) - + } +) -fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound = - response.attempt().unsafeRunSync().fold( - { err -> - logger.withWarn { log("Error occurred. Sending .", err) } - val message = err.message - sendResponse(errorResponse(message)) - }, - { - sendResponse(it) +fun HttpServerResponse.sendAndHandleErrors(response: Mono<Response>) = + response + .onErrorResume { + logger.withWarn { log("Error occurred. Sending .", it) } + errorResponse(it.localizedMessage).toMono() + } + .flatMap { + sendResponse(it).then() } - ) -fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>): NettyOutbound = +fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>) = when (response) { - is Either.Left -> sendResponse(errorResponse(response.a.toString())) - is Either.Right -> sendAndHandleErrors(IO.just(response.b)) + is Either.Left -> sendResponse(errorResponse(response.a.toString())).then() + is Either.Right -> sendAndHandleErrors(Mono.just(response.b)) } -private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound { + +fun HttpServerResponse.sendResponse(response: Response): NettyOutbound { val respWithStatus = status(response.status.number) val responseContent = response.content - return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() })) + return respWithStatus.sendString( + Mono.just(responseContent.serializer.run { responseContent.value.show() }) + ) } private fun errorResponse(message: String?): Response = diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index 7df7283b..fb2c532f 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -72,7 +72,7 @@ internal class XnfApiServer( is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" } is Either.Right -> logger.info { "Scenario started, details: ${id.b}" } } - res.sendEitherErrorOrResponse(id).then() + res.sendEitherErrorOrResponse(id) } } @@ -90,7 +90,7 @@ internal class XnfApiServer( val status = ongoingSimulations.status(id) val response = Responses.statusResponse(status.toString(), status.message) logger.info { "Task $id status: $response" } - return res.sendAndHandleErrors(IO.just(response)).then() + return res.sendAndHandleErrors(Mono.just(response)) } companion object { |