From f864f9ae0cf8cef72b64cbda8964e86398b3f749 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 5 Apr 2019 08:44:52 +0200 Subject: Allow retrieving multiple kafka topics status Change-Id: I5e8433873e5d594e6df9da8c4893b0f54614efae Issue-ID: DCAEGEN2-1399 Signed-off-by: Filip Krzywka --- .../simulators/dcaeapp/impl/DcaeAppSimulator.kt | 47 ++++++--- .../dcaeapp/impl/adapters/DcaeAppApiServer.kt | 108 +++++++++++++-------- .../dcaeapp/impl/adapters/KafkaSource.kt | 10 +- .../veshv/simulators/dcaeapp/impl/consumer.kt | 17 +++- .../veshv/simulators/dcaeapp/impl/ConsumerTest.kt | 6 +- .../dcaeapp/impl/DcaeAppSimulatorTest.kt | 79 ++++++++------- 6 files changed, 166 insertions(+), 101 deletions(-) (limited to 'sources/hv-collector-dcae-app-simulator/src') diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 93c12d25..33e9a37e 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -19,11 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.getOrElse -import org.onap.dcae.collectors.veshv.utils.arrow.getOption +import arrow.core.Option import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.io.InputStream -import java.util.concurrent.atomic.AtomicReference +import java.util.Collections.synchronizedMap /** * @author Piotr Jaszczyk @@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference */ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: AtomicReference = AtomicReference() + private val consumerState: MutableMap = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,24 +41,42 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, throw IllegalArgumentException(message) } - logger.info { "Received new configuration. Creating consumer for topics: $topics" } - consumerState.set(consumerFactory.createConsumerForTopics(topics)) + logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } + synchronized(consumerState) { + consumerState.clear() + consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + } } - fun state() = consumerState.getOption().map { it.currentState() } + fun state(topic: String) = + consumerState(topic) + .map(ConsumerStateProvider::currentState) + .toEither { + val message = "Failed to return consumer state. No consumer found for topic: $topic" + logger.warn { message } + MissingConsumerException(message) + } + + fun resetState(topic: String) = + consumerState(topic) + .map { it.reset() } + .toEither { + val message = "Failed to reset consumer state. No consumer found for topic: $topic" + logger.warn { message } + MissingConsumerException(message) + } - fun resetState() = consumerState.getOption().fold({ }, { it.reset() }) + fun validate(jsonDescription: InputStream, topic: String) = + messageStreamValidation.validate(jsonDescription, currentMessages(topic)) + private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) - fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages()) - private fun currentMessages(): List = - consumerState.getOption() - .map { it.currentState().consumedMessages } - .getOrElse(::emptyList) + private fun currentMessages(topic: String): List = + state(topic).fold({ emptyList() }, { it.consumedMessages }) private fun extractTopics(topicsString: String): Set = - topicsString.substringAfter("=") + topicsString.removeSurrounding("\"") .split(",") .toSet() @@ -67,3 +84,5 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, private val logger = Logger(DcaeAppSimulator::class) } } + +class MissingConsumerException(message: String) : Throwable(message) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index f3fd56bb..6a09be9f 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +import arrow.core.Option import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -26,11 +27,13 @@ import org.onap.dcae.collectors.veshv.utils.http.HttpConstants import org.onap.dcae.collectors.veshv.utils.http.HttpStatus import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors import org.onap.dcae.collectors.veshv.utils.http.sendOrError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest import reactor.netty.http.server.HttpServerRoutes import java.net.InetSocketAddress @@ -39,20 +42,6 @@ import java.net.InetSocketAddress * @since May 2018 */ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { - private val responseValid by lazy { - Responses.statusResponse( - name = "valid", - message = VALID_RESPONSE_MESSAGE - ) - } - - private val responseInvalid by lazy { - Responses.statusResponse( - name = "invalid", - message = INVALID_RESPONSE_MESSAGE, - httpStatus = HttpStatus.BAD_REQUEST - ) - } fun start(socketAddress: InetSocketAddress, kafkaTopics: Set): Mono = Mono.defer { @@ -74,37 +63,49 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { res.sendOrError { simulator.listenToTopics(it) } } } - .delete("/messages") { _, res -> - logger.info { "Resetting simulator state" } + .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Resetting simulator state for topic $it" } + simulator.resetState(it) + Mono.just(Responses.Success) + }.let(res::sendAndHandleErrors) - res - .header("Content-type", CONTENT_TEXT) - .sendOrError { simulator.resetState() } } - .get("/messages/all/count") { _, res -> - logger.info { "Processing request for count of received messages" } - simulator.state().fold( - { - logger.warn { "Error - number of messages could not be specified" } - res.status(HttpConstants.STATUS_NOT_FOUND) - }, - { - logger.info { "Returned number of received messages: ${it.messagesCount}" } - res.sendString(Mono.just(it.messagesCount.toString())) - } - ) + .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Processing request for count of received messages for topic $it" } + simulator.state(it) + .fold({ + val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" } + logger.warn(errorMessage) + Mono.just(Responses.statusResponse( + name = "Count not found", + message = errorMessage(), + httpStatus = HttpStatus.NOT_FOUND + ) + ) + }, { + logger.info { "Returned number of received messages: ${it.messagesCount}" } + Mono.just( + Responses.stringResponse( + message = it.messagesCount.toString(), + httpStatus = HttpStatus.OK + ) + ) + }) + }.let(res::sendAndHandleErrors) } - .post("/messages/all/validate") { req, res -> + .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res -> req .receive().aggregate().asInputStream() - .map { - logger.info { "Processing request for message validation" } - simulator.validate(it) - .map(::resolveValidationResponse) - } - .flatMap { - res.sendAndHandleErrors(it) + .map { inputStream -> + doWithTopicOrReturnInternalErrorResponse(req) { + logger.info { "Processing request for message validation" } + simulator.validate(inputStream, it) + .map(::resolveValidationResponse) + } } + .flatMap(res::sendAndHandleErrors) } .get("/healthcheck") { _, res -> val status = HttpConstants.STATUS_OK @@ -113,6 +114,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { } } + private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest, + topicConsumer: (String) -> Mono) = + Option.fromNullable(req.param(TOPIC_PARAM_KEY)) + .fold({ + Mono.just( + stringResponse("Failed to retrieve parameter from url", + HttpStatus.INTERNAL_SERVER_ERROR)) + }, topicConsumer) + private fun resolveValidationResponse(isValid: Boolean): Response = if (isValid) { logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" } @@ -124,10 +134,26 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { companion object { - private const val CONTENT_TEXT = "text/plain" + private val logger = Logger(DcaeAppApiServer::class) private const val VALID_RESPONSE_MESSAGE = "validation passed" private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request" - private val logger = Logger(DcaeAppApiServer::class) + private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified" + private const val TOPIC_PARAM_KEY = "topic" + + private val responseValid by lazy { + Responses.statusResponse( + name = "valid", + message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + ) + } + + private val responseInvalid by lazy { + Responses.statusResponse( + name = "invalid", + message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + httpStatus = HttpStatus.BAD_REQUEST + ) + } } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 3314805e..0fd3bb10 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -21,10 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.receiver.ReceiverRecord /** * @author Piotr Jaszczyk @@ -32,10 +33,9 @@ import reactor.kafka.receiver.ReceiverOptions */ class KafkaSource(private val receiver: KafkaReceiver) { - fun start() = Consumer() - .also { consumer -> - receiver.receive().map(consumer::update).subscribe() - } + fun start(): Flux> = + receiver.receive() + .also { logger.info { "Started Kafka source" } } companion object { private val logger = Logger(KafkaSource::class) diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 725248ce..a6d1eddb 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider { } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set): Consumer = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() + fun createConsumersForTopics(kafkaTopics: Set): Map = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> + val topicToConsumer = kafkaTopics.associate { it to Consumer() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } + + companion object { + private val logger = Logger(ConsumerFactory::class) + } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt index e8ac6cd5..a594215b 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt @@ -67,16 +67,16 @@ internal class ConsumerTest : Spek({ } }) -fun assertEmptyState(cut: Consumer) { +private fun assertEmptyState(cut: Consumer) { assertState(cut) } -fun assertState(cut: Consumer, vararg values: ByteArray) { +private fun assertState(cut: Consumer, vararg values: ByteArray) { assertThat(cut.currentState().consumedMessages) .containsOnly(*values) assertThat(cut.currentState().messagesCount) .isEqualTo(values.size) } -fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = +private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) = ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index 493100fc..e3e61c81 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -19,8 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import arrow.core.None -import arrow.core.Some +import arrow.core.Right import com.google.protobuf.ByteString import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq @@ -41,6 +40,7 @@ import java.lang.IllegalArgumentException import java.util.concurrent.ConcurrentLinkedQueue import kotlin.test.assertFailsWith + /** * @author Piotr Jaszczyk * @since August 2018 @@ -48,82 +48,87 @@ import kotlin.test.assertFailsWith internal class DcaeAppSimulatorTest : Spek({ lateinit var consumerFactory: ConsumerFactory lateinit var messageStreamValidation: MessageStreamValidation - lateinit var consumer: Consumer + lateinit var perf3gpp_consumer: Consumer + lateinit var faults_consumer: Consumer lateinit var cut: DcaeAppSimulator beforeEachTest { consumerFactory = mock() messageStreamValidation = mock() - consumer = mock() + perf3gpp_consumer = mock() + faults_consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer) + whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf( + PERF3GPP_TOPIC to perf3gpp_consumer, + FAULTS_TOPICS to faults_consumer)) } fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList())) describe("listenToTopics") { - val topics = setOf("perf3gpp", "faults") - it("should fail when topic list is empty") { - assertFailsWith(IllegalArgumentException::class){ + assertFailsWith(IllegalArgumentException::class) { cut.listenToTopics(setOf()) } } it("should fail when topic list contains empty strings") { - assertFailsWith(IllegalArgumentException::class){ - cut.listenToTopics(setOf("perf3gpp", " ", "faults")) + assertFailsWith(IllegalArgumentException::class) { + cut.listenToTopics(setOf(PERF3GPP_TOPIC, " ", FAULTS_TOPICS)) } } it("should subscribe to given topics") { - cut.listenToTopics(topics) - verify(consumerFactory).createConsumerForTopics(topics) + cut.listenToTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) } it("should subscribe to given topics when called with comma separated list") { - cut.listenToTopics("perf3gpp,faults") - verify(consumerFactory).createConsumerForTopics(topics) + cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS") + verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) } } describe("state") { - it("should return None when topics hasn't been initialized") { - assertThat(cut.state()).isEqualTo(None) + it("should return Left when topics hasn't been initialized") { + assertThat(cut.state(PERF3GPP_TOPIC).isLeft()).isTrue() } describe("when topics are initialized") { beforeEachTest { - cut.listenToTopics("perf3gpp") + cut.listenToTopics(TWO_TOPICS) } - it("should return some state when it has been set") { + it("should return state when it has been set") { val state = consumerState() - whenever(consumer.currentState()).thenReturn(state) + whenever(perf3gpp_consumer.currentState()).thenReturn(state) + whenever(faults_consumer.currentState()).thenReturn(state) - assertThat(cut.state()).isEqualTo(Some(state)) + assertThat(cut.state(PERF3GPP_TOPIC)).isEqualTo(Right(state)) + assertThat(cut.state(FAULTS_TOPICS)).isEqualTo(Right(state)) } } } describe("resetState") { it("should do nothing when topics hasn't been initialized") { - cut.resetState() - verify(consumer, never()).reset() + cut.resetState(PERF3GPP_TOPIC) + cut.resetState(FAULTS_TOPICS) + verify(perf3gpp_consumer, never()).reset() + verify(faults_consumer, never()).reset() } describe("when topics are initialized") { beforeEachTest { - cut.listenToTopics("perf3gpp") + cut.listenToTopics(TWO_TOPICS) } - it("should reset the state") { - // when - cut.resetState() + it("should reset the state of given topic consumer") { + cut.resetState(PERF3GPP_TOPIC) - // then - verify(consumer).reset() + verify(perf3gpp_consumer).reset() + verify(faults_consumer, never()).reset() } } } @@ -135,7 +140,7 @@ internal class DcaeAppSimulatorTest : Spek({ it("should use empty list when consumer is unavailable") { StepVerifier - .create(cut.validate("['The JSON']".byteInputStream())) + .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC)) .expectNext(true) .verifyComplete() @@ -143,22 +148,24 @@ internal class DcaeAppSimulatorTest : Spek({ } it("should delegate to MessageStreamValidation") { - // given - cut.listenToTopics("perf3gpp") - whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) + cut.listenToTopics(PERF3GPP_TOPIC) + whenever(perf3gpp_consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray())) - StepVerifier - .create(cut.validate("['The JSON']".byteInputStream())) - .expectNext(true) + StepVerifier + .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC)) + .expectNext(true) .verifyComplete() - // then verify(messageStreamValidation).validate(any(), any()) } } }) +private const val PERF3GPP_TOPIC = "perf3gpp" +private const val FAULTS_TOPICS = "faults" +private val TWO_TOPICS = setOf(PERF3GPP_TOPIC, FAULTS_TOPICS) + private const val DUMMY_EVENT_ID = "aaa" private const val DUMMY_PAYLOAD = "payload" -- cgit 1.2.3-korg