diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-03-20 14:59:24 +0100 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-03-28 11:53:11 +0100 |
commit | 75b95caba1ec7e126683be1df5928a380174d428 (patch) | |
tree | 500c5262fb7c78eb39e458a09bc4a7edd7a01678 /sources/hv-collector-dcae-app-simulator/src/main/kotlin/org | |
parent | 58ae1a831a6fe85abda8c4d866e5170c70499ac1 (diff) |
Remove IO monad usage from simulators
Change-Id: I1c470777b91230f4a44a4960ca534e4b20c1ac43
Issue-ID: DCAEGEN2-1372
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org')
5 files changed, 47 insertions, 61 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index f7d94de5..28866f36 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.getOrElse -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono import java.io.InputStream +import java.lang.IllegalArgumentException import java.util.concurrent.atomic.AtomicReference /** @@ -39,7 +37,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) - fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { + fun listenToTopics(topics: Set<String>) { if (topics.isEmpty() || topics.any { it.isBlank() }) { val message = "Topic list cannot be empty or contain empty elements, topics: $topics" logger.info { message } @@ -47,17 +45,15 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Creating consumer for topics: $topics" } - consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) - }.fix() + consumerState.set(consumerFactory.createConsumerForTopics(topics)) + } fun state() = consumerState.getOption().map { it.currentState() } - fun resetState(): IO<Unit> = consumerState.getOption().fold( - { IO.unit }, - { it.reset() } - ) + fun resetState() = consumerState.getOption().fold({ }, { it.reset() }) + - fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages()) private fun currentMessages(): List<ByteArray> = consumerState.getOption() diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 47a2d22a..144aab02 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.effects.IO -import arrow.effects.fix -import arrow.effects.instances.io.monadError.monadError -import arrow.typeclasses.bindingCatch -import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser @@ -32,6 +27,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIX import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.io.InputStream import javax.json.Json @@ -39,20 +35,21 @@ class MessageStreamValidation( private val messageGenerator: VesEventGenerator, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = - IO.monadError().bindingCatch { - val messageParams = parseMessageParams(jsonDescription) - logger.debug { "Parsed message parameters: $messageParams" } - - val expectedEvents = generateEvents(messageParams).bind() - val actualEvents = decodeConsumedEvents(consumedMessages) - - if (shouldValidatePayloads(messageParams)) - expectedEvents == actualEvents - else - validateHeaders(actualEvents, expectedEvents) - - }.fix() + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>) = + Mono + .fromSupplier { parseMessageParams(jsonDescription) } + .doOnNext { + logger.debug { "Parsed message parameters: $it" } + } + .flatMap { messageParams -> + val actualEvents = decodeConsumedEvents(consumedMessages) + generateEvents(messageParams).map { + if (shouldValidatePayloads(messageParams)) + it == actualEvents + else + validateHeaders(actualEvents, it) + } + } private fun parseMessageParams(input: InputStream): List<VesEventParameters> { val paramsArray = Json.createReader(input).readArray() @@ -97,11 +94,10 @@ class MessageStreamValidation( return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux + private fun generateEvents(parameters: List<VesEventParameters>): Mono<List<VesEvent>> = Flux .fromIterable(parameters) .flatMap { messageGenerator.createMessageFlux(it) } .collectList() - .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = consumedMessages.map(VesEvent::parseFrom) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index d2c5b275..5d2977e4 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -50,9 +50,9 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { ) } - fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> = - simulator.listenToTopics(kafkaTopics).map { + IO { + simulator.listenToTopics(kafkaTopics) HttpServer.create() .host(socketAddress.hostName) .port(socketAddress.port) @@ -60,22 +60,21 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { .let { NettyServerHandle(it.bindNow()) } } - private fun setRoutes(route: HttpServerRoutes) { route .put("/configuration/topics") { req, res -> req .receive().aggregate().asString() .flatMap { - val option = simulator.listenToTopics(it) - res.sendOrError(option).then() + res.sendOrError{ simulator.listenToTopics(it) } } } .delete("/messages") { _, res -> logger.info { "Resetting simulator state" } + res .header("Content-type", CONTENT_TEXT) - .sendOrError(simulator.resetState()) + .sendOrError { simulator.resetState() } } .get("/messages/all/count") { _, res -> logger.info { "Processing request for count of received messages" } @@ -93,12 +92,13 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { .post("/messages/all/validate") { req, res -> req .receive().aggregate().asInputStream() - .flatMap { body -> + .map { logger.info { "Processing request for message validation" } - val response = - simulator.validate(body) - .map(::resolveValidationResponse) - res.sendAndHandleErrors(response).then() + simulator.validate(it) + .map(::resolveValidationResponse) + } + .flatMap { + res.sendAndHandleErrors(it) } } .get("/healthcheck") { _, res -> diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 10dedbdf..7bab9676 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,11 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer -import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions @@ -34,11 +32,10 @@ import reactor.kafka.receiver.ReceiverOptions */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): IO<Consumer> = IO { - val consumer = Consumer() - receiver.receive().map(consumer::update).evaluateIo().subscribe() - consumer - } + fun start() = Consumer() + .also { consumer -> + receiver.receive().map(consumer::update) + } companion object { private val logger = Logger(KafkaSource::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 1eefdbdb..725248ce 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord @@ -41,7 +40,7 @@ class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { interface ConsumerStateProvider { fun currentState(): ConsumerState - fun reset(): IO<Unit> + fun reset() } class Consumer : ConsumerStateProvider { @@ -50,11 +49,9 @@ class Consumer : ConsumerStateProvider { override fun currentState(): ConsumerState = ConsumerState(consumedMessages) - override fun reset(): IO<Unit> = IO { - consumedMessages.clear() - } + override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { + fun update(record: ReceiverRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -65,6 +62,6 @@ class Consumer : ConsumerStateProvider { } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = + fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer = KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() } |