diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-05 08:44:52 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-10 09:05:59 +0200 |
commit | f864f9ae0cf8cef72b64cbda8964e86398b3f749 (patch) | |
tree | f5d45cda36f4ebe5f5466a52a459c0ac7976773b /sources/hv-collector-dcae-app-simulator/src/main/kotlin | |
parent | c9829d23c12b2824a0d56ee6efbd00ad67b9046e (diff) |
Allow retrieving multiple kafka topics status
Change-Id: I5e8433873e5d594e6df9da8c4893b0f54614efae
Issue-ID: DCAEGEN2-1399
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin')
4 files changed, 120 insertions, 62 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 93c12d25..33e9a37e 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -19,11 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.getOrElse -import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import arrow.core.Option import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.io.InputStream -import java.util.concurrent.atomic.AtomicReference +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference */ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() + private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,24 +41,42 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, throw IllegalArgumentException(message) } - logger.info { "Received new configuration. Creating consumer for topics: $topics" } - consumerState.set(consumerFactory.createConsumerForTopics(topics)) + logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } + synchronized(consumerState) { + consumerState.clear() + consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + } } - fun state() = consumerState.getOption().map { it.currentState() } + fun state(topic: String) = + consumerState(topic) + .map(ConsumerStateProvider::currentState) + .toEither { + val message = "Failed to return consumer state. No consumer found for topic: $topic" + logger.warn { message } + MissingConsumerException(message) + } + + fun resetState(topic: String) = + consumerState(topic) + .map { it.reset() } + .toEither { + val message = "Failed to reset consumer state. No consumer found for topic: $topic" + logger.warn { message } + MissingConsumerException(message) + } - fun resetState() = consumerState.getOption().fold({ }, { it.reset() }) + fun validate(jsonDescription: InputStream, topic: String) = + messageStreamValidation.validate(jsonDescription, currentMessages(topic)) + private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) - fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages()) - private fun currentMessages(): List<ByteArray> = - consumerState.getOption() - .map { it.currentState().consumedMessages } - .getOrElse(::emptyList) + private fun currentMessages(topic: String): List<ByteArray> = + state(topic).fold({ emptyList() }, { it.consumedMessages }) private fun extractTopics(topicsString: String): Set<String> = - topicsString.substringAfter("=") + topicsString.removeSurrounding("\"") .split(",") .toSet() @@ -67,3 +84,5 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, private val logger = Logger(DcaeAppSimulator::class) } } + +class MissingConsumerException(message: String) : Throwable(message) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index f3fd56bb..6a09be9f 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +import arrow.core.Option import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -26,11 +27,13 @@ import org.onap.dcae.collectors.veshv.utils.http.HttpConstants import org.onap.dcae.collectors.veshv.utils.http.HttpStatus import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors import org.onap.dcae.collectors.veshv.utils.http.sendOrError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest import reactor.netty.http.server.HttpServerRoutes import java.net.InetSocketAddress @@ -39,20 +42,6 @@ import java.net.InetSocketAddress * @since May 2018 */ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { - private val responseValid by lazy { - Responses.statusResponse( - name = "valid", - message = VALID_RESPONSE_MESSAGE - ) - } - - private val responseInvalid by lazy { - Responses.statusResponse( - name = "invalid", - message = INVALID_RESPONSE_MESSAGE, - httpStatus = HttpStatus.BAD_REQUEST - ) - } fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> = Mono.defer { @@ -74,37 +63,49 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { res.sendOrError { simulator.listenToTopics(it) } } } - .delete("/messages") { _, res -> - logger.info { "Resetting simulator state" } + .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Resetting simulator state for topic $it" } + simulator.resetState(it) + Mono.just(Responses.Success) + }.let(res::sendAndHandleErrors) - res - .header("Content-type", CONTENT_TEXT) - .sendOrError { simulator.resetState() } } - .get("/messages/all/count") { _, res -> - logger.info { "Processing request for count of received messages" } - simulator.state().fold( - { - logger.warn { "Error - number of messages could not be specified" } - res.status(HttpConstants.STATUS_NOT_FOUND) - }, - { - logger.info { "Returned number of received messages: ${it.messagesCount}" } - res.sendString(Mono.just(it.messagesCount.toString())) - } - ) + .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Processing request for count of received messages for topic $it" } + simulator.state(it) + .fold({ + val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" } + logger.warn(errorMessage) + Mono.just(Responses.statusResponse( + name = "Count not found", + message = errorMessage(), + httpStatus = HttpStatus.NOT_FOUND + ) + ) + }, { + logger.info { "Returned number of received messages: ${it.messagesCount}" } + Mono.just( + Responses.stringResponse( + message = it.messagesCount.toString(), + httpStatus = HttpStatus.OK + ) + ) + }) + }.let(res::sendAndHandleErrors) } - .post("/messages/all/validate") { req, res -> + .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res -> req .receive().aggregate().asInputStream() - .map { - logger.info { "Processing request for message validation" } - simulator.validate(it) - .map(::resolveValidationResponse) - } - .flatMap { - res.sendAndHandleErrors(it) + .map { inputStream -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Processing request for message validation" } + simulator.validate(inputStream, it) + .map(::resolveValidationResponse) + } } + .flatMap(res::sendAndHandleErrors) } .get("/healthcheck") { _, res -> val status = HttpConstants.STATUS_OK @@ -113,6 +114,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } } + private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest, + topicConsumer: (String) -> Mono<Response>) = + Option.fromNullable(req.param(TOPIC_PARAM_KEY)) + .fold({ + Mono.just( + stringResponse("Failed to retrieve parameter from url", + HttpStatus.INTERNAL_SERVER_ERROR)) + }, topicConsumer) + private fun resolveValidationResponse(isValid: Boolean): Response = if (isValid) { logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } @@ -124,10 +134,26 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { companion object { - private const val CONTENT_TEXT = "text/plain" + private val logger = Logger(DcaeAppApiServer::class) private const val VALID_RESPONSE_MESSAGE = "validation passed" private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request" - private val logger = Logger(DcaeAppApiServer::class) + private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified" + private const val TOPIC_PARAM_KEY = "topic" + + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + httpStatus = HttpStatus.BAD_REQUEST + ) + } } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 3314805e..0fd3bb10 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -21,10 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -32,10 +33,9 @@ import reactor.kafka.receiver.ReceiverOptions */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start() = Consumer() - .also { consumer -> - receiver.receive().map(consumer::update).subscribe() - } + fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> = + receiver.receive() + .also { logger.info { "Started Kafka source" } } companion object { private val logger = Logger(KafkaSource::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 725248ce..a6d1eddb 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider { } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() + fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> + val topicToConsumer = kafkaTopics.associate { it to Consumer() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } + + companion object { + private val logger = Logger(ConsumerFactory::class) + } } |