diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-24 12:51:14 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-31 12:16:08 +0200 |
commit | d00acee05c05c7e3146abf7d13b78953f9a0d3f9 (patch) | |
tree | 521e0ef361bf176ce96ad6c718f198f464412b1c /hv-collector-dcae-app-simulator/src/main | |
parent | 5bdae83e8b93cebbb84f56d5830beb726a164d76 (diff) |
Improve DCAE APP Simulator coverage
Also there was a need to refactor the code, because application logic
was placed inside Ratpack handlers.
Change-Id: Iba3d4d039a98ba88e0dba580c1b7726b53440538
Issue-ID: DCAEGEN2-732
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main')
9 files changed, 321 insertions, 196 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt new file mode 100644 index 00000000..262e05bf --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.core.getOrElse +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.io.InputStream +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, + private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() + + fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) + + fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch { + if (topics.any { it.isBlank() }) + throw IllegalArgumentException("Topic list cannot contain empty elements") + if (topics.isEmpty()) + throw IllegalArgumentException("Topic list cannot be empty") + + logger.info("Received new configuration. Creating consumer for topics: $topics") + consumerState.set(consumerFactory.createConsumerForTopics(topics).bind()) + }.fix() + + fun state() = consumerState.getOption().map { it.currentState() } + + fun resetState(): IO<Unit> = consumerState.getOption().fold( + { IO.unit }, + { it.reset() } + ) + + fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages()) + + private fun currentMessages(): List<ByteArray> = + consumerState.getOption() + .map { it.currentState().consumedMessages } + .getOrElse(::emptyList) + + private fun extractTopics(topicsString: String): Set<String> = + topicsString.substringAfter("=") + .split(",") + .toSet() + + companion object { + private val logger = Logger(DcaeAppSimulator::class) + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt new file mode 100644 index 00000000..239f7102 --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl + +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monadError +import arrow.typeclasses.bindingCatch +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType +import org.onap.ves.VesEventV5 +import java.io.InputStream +import javax.json.Json + +class MessageStreamValidation( + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = + IO.monadError().bindingCatch { + val messageParams = parseMessageParams(jsonDescription) + val expectedEvents = generateEvents(messageParams).bind() + val actualEvents = decodeConsumedEvents(consumedMessages) + if (shouldValidatePayloads(messageParams)) { + expectedEvents == actualEvents + } else { + validateHeaders(actualEvents, expectedEvents) + } + }.fix() + + private fun parseMessageParams(input: InputStream): List<MessageParameters> { + val expectations = Json.createReader(input).readArray() + val messageParams = messageParametersParser.parse(expectations) + + if (messageParams.isEmpty()) + throw IllegalArgumentException("Message param list cannot be empty") + + return messageParams + } + + private fun shouldValidatePayloads(parameters: List<MessageParameters>) = + parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } + + + private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean { + val consumedHeaders = actual.map { it.commonEventHeader } + val generatedHeaders = expected.map { it.commonEventHeader } + return generatedHeaders == consumedHeaders + } + + + private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> = + messageGenerator.createMessageFlux(parameters) + .map(PayloadWireFrameMessage::payload) + .map(ByteData::unsafeAsArray) + .map(VesEventV5.VesEvent::parseFrom) + .collectList() + .asIo() + + private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = + consumedMessages.map(VesEventV5.VesEvent::parseFrom) + +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt new file mode 100644 index 00000000..6c830b9d --- /dev/null +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt @@ -0,0 +1,129 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters + +import arrow.core.Left +import arrow.core.Right +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import ratpack.exec.Promise +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.Response +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class ApiServer(private val simulator: DcaeAppSimulator) { + + + fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = + simulator.listenToTopics(kafkaTopics).map { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(::setupHandlers) + } + } + + private fun setupHandlers(chain: Chain) { + chain + .put("configuration/topics") { ctx -> + val operation = ctx.bodyIo().flatMap { body -> + simulator.listenToTopics(body.text) + } + ctx.response.sendOrError(operation) + + } + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + ctx.response.sendOrError(simulator.resetState()) + } + .get("messages/all/count") { ctx -> + simulator.state().fold( + { ctx.response.status(STATUS_NOT_FOUND) }, + { + ctx.response + .contentType(CONTENT_TEXT) + .send(it.messagesCount.toString()) + }) + } + .post("messages/all/validate") { ctx -> + val responseStatus = IO.monad().binding { + val body = ctx.bodyIo().bind() + val isValid = simulator.validate(body.inputStream).bind() + if (isValid) + STATUS_OK + else + STATUS_BAD_REQUEST + }.fix() + + ctx.response.sendStatusOrError(responseStatus) + } + .get("healthcheck") { ctx -> + ctx.response.status(STATUS_OK).send() + } + } + + private fun Context.bodyIo() = request.body.asIo() + + private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult -> + onError { + emitResult(Left(it)) + }.then { result -> + emitResult(Right(result)) + } + } + + private fun Response.sendOrError(responseStatus: IO<Unit>) { + sendStatusOrError(responseStatus.map { STATUS_OK }) + } + + private fun Response.sendStatusOrError(responseStatus: IO<Int>) { + responseStatus.unsafeRunAsync { cb -> + cb.fold( + { err -> + logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err) + status(ApiServer.STATUS_INTERNAL_SERVER_ERROR) + .send(CONTENT_TEXT, err.message) + }, + { + status(it).send() + } + ) + } + } + + companion object { + private val logger = Logger(ApiServer::class) + private const val CONTENT_TEXT = "text/plain" + + private const val STATUS_OK = 200 + private const val STATUS_BAD_REQUEST = 400 + private const val STATUS_NOT_FOUND = 404 + private const val STATUS_INTERNAL_SERVER_ERROR = 500 + } +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index d53609ca..15965174 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -17,15 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer +import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions -import java.util.* /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,7 +36,7 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { fun start(): IO<Consumer> = IO { val consumer = Consumer() - receiver.receive().subscribe(consumer::update) + receiver.receive().map(consumer::update).evaluateIo().subscribe() consumer } @@ -43,18 +44,22 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { private val logger = Logger(KafkaSource::class) fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { - val props = HashMap<String, Any>() - props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator" - props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators" - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" - val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props) + return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) + } + + fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { + val props = mapOf<String, Any>( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", + ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ) + return ReceiverOptions.create<ByteArray, ByteArray>(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } .subscription(topics) - return KafkaSource(KafkaReceiver.create(receiverOptions)) } } } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index 065cdf92..d5f55605 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config import arrow.core.ForOption import arrow.core.Option diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt index 5bd2d155..c114313d 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config data class DcaeAppSimConfiguration( val apiPort: Int, diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 08bb149f..1eefdbdb 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -17,9 +17,10 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue @@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) { val messagesCount: Int by lazy { messages.size } @@ -53,19 +54,17 @@ class Consumer : ConsumerStateProvider { consumedMessages.clear() } - fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } - companion object { private val logger = Logger(Consumer::class) } } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider { - return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync() - } + fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 9f84fc4d..a65a2686 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,10 +20,12 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import arrow.effects.IO -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.void @@ -50,7 +52,7 @@ fun main(args: Array<String>) = private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - return ApiServer(ConsumerFactory(config.kafkaBootstrapServers)) + return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) .start(config.apiPort, config.kafkaTopics) .void() -}
\ No newline at end of file +} diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt deleted file mode 100644 index cd258134..00000000 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD -import org.onap.ves.VesEventV5.VesEvent -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.publisher.Mono -import javax.json.Json - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class ApiServer(private val consumerFactory: ConsumerFactory, - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { - - private lateinit var consumerState: ConsumerStateProvider - - fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO { - consumerState = consumerFactory.createConsumerForTopics(kafkaTopics) - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::setupHandlers) - } - } - - private fun setupHandlers(chain: Chain) { - chain - .put("configuration/topics") { ctx -> - ctx.request.body.then { it -> - val topics = extractTopics(it.text) - logger.info("Received new configuration. Creating consumer for topics: $topics") - consumerState = consumerFactory.createConsumerForTopics(topics) - ctx.response - .status(STATUS_OK) - .send() - } - - } - .delete("messages") { ctx -> - ctx.response.contentType(CONTENT_TEXT) - consumerState.reset() - .unsafeRunAsync { - it.fold( - { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) }, - { ctx.response.status(STATUS_OK) } - ).send() - } - } - .get("messages/all/count") { ctx -> - val state = consumerState.currentState() - ctx.response - .contentType(CONTENT_TEXT) - .send(state.messagesCount.toString()) - } - .post("messages/all/validate") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { generateEvents(ctx, it) } - .then { (generatedEvents, shouldValidatePayloads) -> - generatedEvents - .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) } - .block() - } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun generateEvents(ctx: Context, parameters: List<MessageParameters>): - Pair<Mono<List<VesEvent>>, Boolean> = Pair( - - doGenerateEvents(parameters).doOnError { - logger.error("Error occurred when generating messages: $it") - ctx.response - .status(STATUS_INTERNAL_SERVER_ERROR) - .send() - }, - parameters.all { it.messageType == FIXED_PAYLOAD } - ) - - private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE - .createMessageFlux(parameters) - .map(PayloadWireFrameMessage::payload) - .map { decode(it.unsafeAsArray()) } - .collectList() - - - private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes) - - - private fun sendResponse(ctx: Context, - generatedEvents: List<VesEvent>, - shouldValidatePayloads: Boolean) = - resolveResponseStatusCode( - generated = generatedEvents, - consumed = decodeConsumedEvents(), - validatePayloads = shouldValidatePayloads - ).let { ctx.response.status(it).send() } - - - private fun decodeConsumedEvents(): List<VesEvent> = consumerState - .currentState() - .consumedMessages - .map(::decode) - - - private fun resolveResponseStatusCode(generated: List<VesEvent>, - consumed: List<VesEvent>, - validatePayloads: Boolean): Int = - if (validatePayloads) { - if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST - } else { - validateHeaders(consumed, generated) - } - - private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int { - val consumedHeaders = consumed.map { it.commonEventHeader } - val generatedHeaders = generated.map { it.commonEventHeader } - return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST - } - - private fun extractTopics(it: String): Set<String> = - it.substringAfter("=") - .split(",") - .toSet() - - companion object { - private val logger = Logger(ApiServer::class) - private const val CONTENT_TEXT = "text/plain" - - private const val STATUS_OK = 200 - private const val STATUS_BAD_REQUEST = 400 - private const val STATUS_INTERNAL_SERVER_ERROR = 500 - } -} - - |