aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src/main/kotlin
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-04-05 08:44:52 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-04-10 09:05:59 +0200
commitf864f9ae0cf8cef72b64cbda8964e86398b3f749 (patch)
treef5d45cda36f4ebe5f5466a52a459c0ac7976773b /sources/hv-collector-dcae-app-simulator/src/main/kotlin
parentc9829d23c12b2824a0d56ee6efbd00ad67b9046e (diff)
Allow retrieving multiple kafka topics status
Change-Id: I5e8433873e5d594e6df9da8c4893b0f54614efae Issue-ID: DCAEGEN2-1399 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt47
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt108
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt10
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt17
4 files changed, 120 insertions, 62 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 93c12d25..33e9a37e 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -19,11 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.getOrElse
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.io.InputStream
-import java.util.concurrent.atomic.AtomicReference
+import java.util.Collections.synchronizedMap
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference
*/
class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
private val messageStreamValidation: MessageStreamValidation) {
- private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+ private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf())
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
@@ -42,24 +41,42 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
throw IllegalArgumentException(message)
}
- logger.info { "Received new configuration. Creating consumer for topics: $topics" }
- consumerState.set(consumerFactory.createConsumerForTopics(topics))
+ logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" }
+ synchronized(consumerState) {
+ consumerState.clear()
+ consumerState.putAll(consumerFactory.createConsumersForTopics(topics))
+ }
}
- fun state() = consumerState.getOption().map { it.currentState() }
+ fun state(topic: String) =
+ consumerState(topic)
+ .map(ConsumerStateProvider::currentState)
+ .toEither {
+ val message = "Failed to return consumer state. No consumer found for topic: $topic"
+ logger.warn { message }
+ MissingConsumerException(message)
+ }
+
+ fun resetState(topic: String) =
+ consumerState(topic)
+ .map { it.reset() }
+ .toEither {
+ val message = "Failed to reset consumer state. No consumer found for topic: $topic"
+ logger.warn { message }
+ MissingConsumerException(message)
+ }
- fun resetState() = consumerState.getOption().fold({ }, { it.reset() })
+ fun validate(jsonDescription: InputStream, topic: String) =
+ messageStreamValidation.validate(jsonDescription, currentMessages(topic))
+ private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic])
- fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages())
- private fun currentMessages(): List<ByteArray> =
- consumerState.getOption()
- .map { it.currentState().consumedMessages }
- .getOrElse(::emptyList)
+ private fun currentMessages(topic: String): List<ByteArray> =
+ state(topic).fold({ emptyList() }, { it.consumedMessages })
private fun extractTopics(topicsString: String): Set<String> =
- topicsString.substringAfter("=")
+ topicsString.removeSurrounding("\"")
.split(",")
.toSet()
@@ -67,3 +84,5 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
private val logger = Logger(DcaeAppSimulator::class)
}
}
+
+class MissingConsumerException(message: String) : Throwable(message)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index f3fd56bb..6a09be9f 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -26,11 +27,13 @@ import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
import org.onap.dcae.collectors.veshv.utils.http.sendOrError
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerRoutes
import java.net.InetSocketAddress
@@ -39,20 +42,6 @@ import java.net.InetSocketAddress
* @since May 2018
*/
class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
- private val responseValid by lazy {
- Responses.statusResponse(
- name = "valid",
- message = VALID_RESPONSE_MESSAGE
- )
- }
-
- private val responseInvalid by lazy {
- Responses.statusResponse(
- name = "invalid",
- message = INVALID_RESPONSE_MESSAGE,
- httpStatus = HttpStatus.BAD_REQUEST
- )
- }
fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
Mono.defer {
@@ -74,37 +63,49 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
res.sendOrError { simulator.listenToTopics(it) }
}
}
- .delete("/messages") { _, res ->
- logger.info { "Resetting simulator state" }
+ .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Resetting simulator state for topic $it" }
+ simulator.resetState(it)
+ Mono.just(Responses.Success)
+ }.let(res::sendAndHandleErrors)
- res
- .header("Content-type", CONTENT_TEXT)
- .sendOrError { simulator.resetState() }
}
- .get("/messages/all/count") { _, res ->
- logger.info { "Processing request for count of received messages" }
- simulator.state().fold(
- {
- logger.warn { "Error - number of messages could not be specified" }
- res.status(HttpConstants.STATUS_NOT_FOUND)
- },
- {
- logger.info { "Returned number of received messages: ${it.messagesCount}" }
- res.sendString(Mono.just(it.messagesCount.toString()))
- }
- )
+ .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Processing request for count of received messages for topic $it" }
+ simulator.state(it)
+ .fold({
+ val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" }
+ logger.warn(errorMessage)
+ Mono.just(Responses.statusResponse(
+ name = "Count not found",
+ message = errorMessage(),
+ httpStatus = HttpStatus.NOT_FOUND
+ )
+ )
+ }, {
+ logger.info { "Returned number of received messages: ${it.messagesCount}" }
+ Mono.just(
+ Responses.stringResponse(
+ message = it.messagesCount.toString(),
+ httpStatus = HttpStatus.OK
+ )
+ )
+ })
+ }.let(res::sendAndHandleErrors)
}
- .post("/messages/all/validate") { req, res ->
+ .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res ->
req
.receive().aggregate().asInputStream()
- .map {
- logger.info { "Processing request for message validation" }
- simulator.validate(it)
- .map(::resolveValidationResponse)
- }
- .flatMap {
- res.sendAndHandleErrors(it)
+ .map { inputStream ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Processing request for message validation" }
+ simulator.validate(inputStream, it)
+ .map(::resolveValidationResponse)
+ }
}
+ .flatMap(res::sendAndHandleErrors)
}
.get("/healthcheck") { _, res ->
val status = HttpConstants.STATUS_OK
@@ -113,6 +114,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
}
+ private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest,
+ topicConsumer: (String) -> Mono<Response>) =
+ Option.fromNullable(req.param(TOPIC_PARAM_KEY))
+ .fold({
+ Mono.just(
+ stringResponse("Failed to retrieve parameter from url",
+ HttpStatus.INTERNAL_SERVER_ERROR))
+ }, topicConsumer)
+
private fun resolveValidationResponse(isValid: Boolean): Response =
if (isValid) {
logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
@@ -124,10 +134,26 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
companion object {
- private const val CONTENT_TEXT = "text/plain"
+ private val logger = Logger(DcaeAppApiServer::class)
private const val VALID_RESPONSE_MESSAGE = "validation passed"
private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request"
- private val logger = Logger(DcaeAppApiServer::class)
+ private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified"
+ private const val TOPIC_PARAM_KEY = "topic"
+
+ private val responseValid by lazy {
+ Responses.statusResponse(
+ name = "valid",
+ message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE
+ )
+ }
+
+ private val responseInvalid by lazy {
+ Responses.statusResponse(
+ name = "invalid",
+ message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE,
+ httpStatus = HttpStatus.BAD_REQUEST
+ )
+ }
}
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index 3314805e..0fd3bb10 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -21,10 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
+import reactor.kafka.receiver.ReceiverRecord
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -32,10 +33,9 @@ import reactor.kafka.receiver.ReceiverOptions
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- fun start() = Consumer()
- .also { consumer ->
- receiver.receive().map(consumer::update).subscribe()
- }
+ fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
+ receiver.receive()
+ .also { logger.info { "Started Kafka source" } }
companion object {
private val logger = Logger(KafkaSource::class)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 725248ce..a6d1eddb 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider {
}
class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
+ fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
+ val topicToConsumer = kafkaTopics.associate { it to Consumer() }
+ kafkaSource.start()
+ .map {
+ val topic = it.topic()
+ topicToConsumer.get(topic)?.update(it)
+ ?: logger.warn { "No consumer configured for topic $topic" }
+ }.subscribe()
+ topicToConsumer
+ }
+
+ companion object {
+ private val logger = Logger(ConsumerFactory::class)
+ }
}