aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-04-05 08:44:52 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-04-10 09:05:59 +0200
commitf864f9ae0cf8cef72b64cbda8964e86398b3f749 (patch)
treef5d45cda36f4ebe5f5466a52a459c0ac7976773b /sources/hv-collector-dcae-app-simulator
parentc9829d23c12b2824a0d56ee6efbd00ad67b9046e (diff)
Allow retrieving multiple kafka topics status
Change-Id: I5e8433873e5d594e6df9da8c4893b0f54614efae Issue-ID: DCAEGEN2-1399 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt47
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt108
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt10
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt17
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt6
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt79
6 files changed, 166 insertions, 101 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 93c12d25..33e9a37e 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -19,11 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.getOrElse
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.io.InputStream
-import java.util.concurrent.atomic.AtomicReference
+import java.util.Collections.synchronizedMap
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference
*/
class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
private val messageStreamValidation: MessageStreamValidation) {
- private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+ private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf())
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
@@ -42,24 +41,42 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
throw IllegalArgumentException(message)
}
- logger.info { "Received new configuration. Creating consumer for topics: $topics" }
- consumerState.set(consumerFactory.createConsumerForTopics(topics))
+ logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" }
+ synchronized(consumerState) {
+ consumerState.clear()
+ consumerState.putAll(consumerFactory.createConsumersForTopics(topics))
+ }
}
- fun state() = consumerState.getOption().map { it.currentState() }
+ fun state(topic: String) =
+ consumerState(topic)
+ .map(ConsumerStateProvider::currentState)
+ .toEither {
+ val message = "Failed to return consumer state. No consumer found for topic: $topic"
+ logger.warn { message }
+ MissingConsumerException(message)
+ }
+
+ fun resetState(topic: String) =
+ consumerState(topic)
+ .map { it.reset() }
+ .toEither {
+ val message = "Failed to reset consumer state. No consumer found for topic: $topic"
+ logger.warn { message }
+ MissingConsumerException(message)
+ }
- fun resetState() = consumerState.getOption().fold({ }, { it.reset() })
+ fun validate(jsonDescription: InputStream, topic: String) =
+ messageStreamValidation.validate(jsonDescription, currentMessages(topic))
+ private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic])
- fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages())
- private fun currentMessages(): List<ByteArray> =
- consumerState.getOption()
- .map { it.currentState().consumedMessages }
- .getOrElse(::emptyList)
+ private fun currentMessages(topic: String): List<ByteArray> =
+ state(topic).fold({ emptyList() }, { it.consumedMessages })
private fun extractTopics(topicsString: String): Set<String> =
- topicsString.substringAfter("=")
+ topicsString.removeSurrounding("\"")
.split(",")
.toSet()
@@ -67,3 +84,5 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
private val logger = Logger(DcaeAppSimulator::class)
}
}
+
+class MissingConsumerException(message: String) : Throwable(message)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
index f3fd56bb..6a09be9f 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -26,11 +27,13 @@ import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.Responses.stringResponse
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
import org.onap.dcae.collectors.veshv.utils.http.sendOrError
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerRoutes
import java.net.InetSocketAddress
@@ -39,20 +42,6 @@ import java.net.InetSocketAddress
* @since May 2018
*/
class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
- private val responseValid by lazy {
- Responses.statusResponse(
- name = "valid",
- message = VALID_RESPONSE_MESSAGE
- )
- }
-
- private val responseInvalid by lazy {
- Responses.statusResponse(
- name = "invalid",
- message = INVALID_RESPONSE_MESSAGE,
- httpStatus = HttpStatus.BAD_REQUEST
- )
- }
fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
Mono.defer {
@@ -74,37 +63,49 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
res.sendOrError { simulator.listenToTopics(it) }
}
}
- .delete("/messages") { _, res ->
- logger.info { "Resetting simulator state" }
+ .delete("/messages/{$TOPIC_PARAM_KEY}") { req, res ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Resetting simulator state for topic $it" }
+ simulator.resetState(it)
+ Mono.just(Responses.Success)
+ }.let(res::sendAndHandleErrors)
- res
- .header("Content-type", CONTENT_TEXT)
- .sendOrError { simulator.resetState() }
}
- .get("/messages/all/count") { _, res ->
- logger.info { "Processing request for count of received messages" }
- simulator.state().fold(
- {
- logger.warn { "Error - number of messages could not be specified" }
- res.status(HttpConstants.STATUS_NOT_FOUND)
- },
- {
- logger.info { "Returned number of received messages: ${it.messagesCount}" }
- res.sendString(Mono.just(it.messagesCount.toString()))
- }
- )
+ .get("/messages/{$TOPIC_PARAM_KEY}/count") { req, res ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Processing request for count of received messages for topic $it" }
+ simulator.state(it)
+ .fold({
+ val errorMessage = { COUNT_NOT_RESOLVED_MESSAGE + ". Reason: ${it.message}" }
+ logger.warn(errorMessage)
+ Mono.just(Responses.statusResponse(
+ name = "Count not found",
+ message = errorMessage(),
+ httpStatus = HttpStatus.NOT_FOUND
+ )
+ )
+ }, {
+ logger.info { "Returned number of received messages: ${it.messagesCount}" }
+ Mono.just(
+ Responses.stringResponse(
+ message = it.messagesCount.toString(),
+ httpStatus = HttpStatus.OK
+ )
+ )
+ })
+ }.let(res::sendAndHandleErrors)
}
- .post("/messages/all/validate") { req, res ->
+ .post("/messages/{$TOPIC_PARAM_KEY}/validate") { req, res ->
req
.receive().aggregate().asInputStream()
- .map {
- logger.info { "Processing request for message validation" }
- simulator.validate(it)
- .map(::resolveValidationResponse)
- }
- .flatMap {
- res.sendAndHandleErrors(it)
+ .map { inputStream ->
+ doWithTopicOrReturnInternalErrorResponse(req) {
+ logger.info { "Processing request for message validation" }
+ simulator.validate(inputStream, it)
+ .map(::resolveValidationResponse)
+ }
}
+ .flatMap(res::sendAndHandleErrors)
}
.get("/healthcheck") { _, res ->
val status = HttpConstants.STATUS_OK
@@ -113,6 +114,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
}
}
+ private fun doWithTopicOrReturnInternalErrorResponse(req: HttpServerRequest,
+ topicConsumer: (String) -> Mono<Response>) =
+ Option.fromNullable(req.param(TOPIC_PARAM_KEY))
+ .fold({
+ Mono.just(
+ stringResponse("Failed to retrieve parameter from url",
+ HttpStatus.INTERNAL_SERVER_ERROR))
+ }, topicConsumer)
+
private fun resolveValidationResponse(isValid: Boolean): Response =
if (isValid) {
logger.info { "Comparison result: $VALID_RESPONSE_MESSAGE" }
@@ -124,10 +134,26 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
companion object {
- private const val CONTENT_TEXT = "text/plain"
+ private val logger = Logger(DcaeAppApiServer::class)
private const val VALID_RESPONSE_MESSAGE = "validation passed"
private const val INVALID_RESPONSE_MESSAGE = "consumed messages don't match data from validation request"
- private val logger = Logger(DcaeAppApiServer::class)
+ private const val COUNT_NOT_RESOLVED_MESSAGE = "Error - number of messages could not be specified"
+ private const val TOPIC_PARAM_KEY = "topic"
+
+ private val responseValid by lazy {
+ Responses.statusResponse(
+ name = "valid",
+ message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE
+ )
+ }
+
+ private val responseInvalid by lazy {
+ Responses.statusResponse(
+ name = "invalid",
+ message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE,
+ httpStatus = HttpStatus.BAD_REQUEST
+ )
+ }
}
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index 3314805e..0fd3bb10 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -21,10 +21,11 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
+import reactor.kafka.receiver.ReceiverRecord
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -32,10 +33,9 @@ import reactor.kafka.receiver.ReceiverOptions
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- fun start() = Consumer()
- .also { consumer ->
- receiver.receive().map(consumer::update).subscribe()
- }
+ fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
+ receiver.receive()
+ .also { logger.info { "Started Kafka source" } }
companion object {
private val logger = Logger(KafkaSource::class)
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 725248ce..a6d1eddb 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider {
}
class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
+ fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
+ val topicToConsumer = kafkaTopics.associate { it to Consumer() }
+ kafkaSource.start()
+ .map {
+ val topic = it.topic()
+ topicToConsumer.get(topic)?.update(it)
+ ?: logger.warn { "No consumer configured for topic $topic" }
+ }.subscribe()
+ topicToConsumer
+ }
+
+ companion object {
+ private val logger = Logger(ConsumerFactory::class)
+ }
}
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
index e8ac6cd5..a594215b 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
@@ -67,16 +67,16 @@ internal class ConsumerTest : Spek({
}
})
-fun assertEmptyState(cut: Consumer) {
+private fun assertEmptyState(cut: Consumer) {
assertState(cut)
}
-fun assertState(cut: Consumer, vararg values: ByteArray) {
+private fun assertState(cut: Consumer, vararg values: ByteArray) {
assertThat(cut.currentState().consumedMessages)
.containsOnly(*values)
assertThat(cut.currentState().messagesCount)
.isEqualTo(values.size)
}
-fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index 493100fc..e3e61c81 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -19,8 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.None
-import arrow.core.Some
+import arrow.core.Right
import com.google.protobuf.ByteString
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
@@ -41,6 +40,7 @@ import java.lang.IllegalArgumentException
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.test.assertFailsWith
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
@@ -48,82 +48,87 @@ import kotlin.test.assertFailsWith
internal class DcaeAppSimulatorTest : Spek({
lateinit var consumerFactory: ConsumerFactory
lateinit var messageStreamValidation: MessageStreamValidation
- lateinit var consumer: Consumer
+ lateinit var perf3gpp_consumer: Consumer
+ lateinit var faults_consumer: Consumer
lateinit var cut: DcaeAppSimulator
beforeEachTest {
consumerFactory = mock()
messageStreamValidation = mock()
- consumer = mock()
+ perf3gpp_consumer = mock()
+ faults_consumer = mock()
cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
- whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
+ whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf(
+ PERF3GPP_TOPIC to perf3gpp_consumer,
+ FAULTS_TOPICS to faults_consumer))
}
fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
describe("listenToTopics") {
- val topics = setOf("perf3gpp", "faults")
-
it("should fail when topic list is empty") {
- assertFailsWith(IllegalArgumentException::class){
+ assertFailsWith(IllegalArgumentException::class) {
cut.listenToTopics(setOf())
}
}
it("should fail when topic list contains empty strings") {
- assertFailsWith(IllegalArgumentException::class){
- cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
+ assertFailsWith(IllegalArgumentException::class) {
+ cut.listenToTopics(setOf(PERF3GPP_TOPIC, " ", FAULTS_TOPICS))
}
}
it("should subscribe to given topics") {
- cut.listenToTopics(topics)
- verify(consumerFactory).createConsumerForTopics(topics)
+ cut.listenToTopics(TWO_TOPICS)
+ verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
}
it("should subscribe to given topics when called with comma separated list") {
- cut.listenToTopics("perf3gpp,faults")
- verify(consumerFactory).createConsumerForTopics(topics)
+ cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS")
+ verify(consumerFactory).createConsumersForTopics(TWO_TOPICS)
}
}
describe("state") {
- it("should return None when topics hasn't been initialized") {
- assertThat(cut.state()).isEqualTo(None)
+ it("should return Left when topics hasn't been initialized") {
+ assertThat(cut.state(PERF3GPP_TOPIC).isLeft()).isTrue()
}
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("perf3gpp")
+ cut.listenToTopics(TWO_TOPICS)
}
- it("should return some state when it has been set") {
+ it("should return state when it has been set") {
val state = consumerState()
- whenever(consumer.currentState()).thenReturn(state)
+ whenever(perf3gpp_consumer.currentState()).thenReturn(state)
+ whenever(faults_consumer.currentState()).thenReturn(state)
- assertThat(cut.state()).isEqualTo(Some(state))
+ assertThat(cut.state(PERF3GPP_TOPIC)).isEqualTo(Right(state))
+ assertThat(cut.state(FAULTS_TOPICS)).isEqualTo(Right(state))
}
}
}
describe("resetState") {
it("should do nothing when topics hasn't been initialized") {
- cut.resetState()
- verify(consumer, never()).reset()
+ cut.resetState(PERF3GPP_TOPIC)
+ cut.resetState(FAULTS_TOPICS)
+ verify(perf3gpp_consumer, never()).reset()
+ verify(faults_consumer, never()).reset()
}
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("perf3gpp")
+ cut.listenToTopics(TWO_TOPICS)
}
- it("should reset the state") {
- // when
- cut.resetState()
+ it("should reset the state of given topic consumer") {
+ cut.resetState(PERF3GPP_TOPIC)
- // then
- verify(consumer).reset()
+ verify(perf3gpp_consumer).reset()
+ verify(faults_consumer, never()).reset()
}
}
}
@@ -135,7 +140,7 @@ internal class DcaeAppSimulatorTest : Spek({
it("should use empty list when consumer is unavailable") {
StepVerifier
- .create(cut.validate("['The JSON']".byteInputStream()))
+ .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
.expectNext(true)
.verifyComplete()
@@ -143,22 +148,24 @@ internal class DcaeAppSimulatorTest : Spek({
}
it("should delegate to MessageStreamValidation") {
- // given
- cut.listenToTopics("perf3gpp")
- whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
+ cut.listenToTopics(PERF3GPP_TOPIC)
+ whenever(perf3gpp_consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
- StepVerifier
- .create(cut.validate("['The JSON']".byteInputStream()))
- .expectNext(true)
+ StepVerifier
+ .create(cut.validate("['The JSON']".byteInputStream(), PERF3GPP_TOPIC))
+ .expectNext(true)
.verifyComplete()
- // then
verify(messageStreamValidation).validate(any(), any())
}
}
})
+private const val PERF3GPP_TOPIC = "perf3gpp"
+private const val FAULTS_TOPICS = "faults"
+private val TWO_TOPICS = setOf(PERF3GPP_TOPIC, FAULTS_TOPICS)
+
private const val DUMMY_EVENT_ID = "aaa"
private const val DUMMY_PAYLOAD = "payload"