diff options
author | kjaniak <kornel.janiak@nokia.com> | 2019-06-19 08:39:33 +0200 |
---|---|---|
committer | kjaniak <kornel.janiak@nokia.com> | 2019-06-25 15:52:13 +0200 |
commit | 7808010c1a18531ee9b618f934d31816193cac38 (patch) | |
tree | 3680678be6f909054ff3a022aeda35b1ee5e47ac /sources/hv-collector-dcae-app-simulator/src | |
parent | e52b0ef6c1c3b502f840f08a17b6594e17957db1 (diff) |
Implement message counting in consumer
Issue-ID: DCAEGEN2-1635
Change-Id: I2666de7bad27052d9cefa0f687ad0772d4c9a95d
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src')
7 files changed, 26 insertions, 168 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 122d9bf0..beacfd79 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -28,9 +28,9 @@ import java.util.Collections.synchronizedMap * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation) { - private val consumerState: MutableMap<String, ConsumerStateProvider> = synchronizedMap(mutableMapOf()) +internal class DcaeAppSimulator(private val consumerFactory: DcaeAppConsumerFactory, + private val messageStreamValidation: MessageStreamValidation) { + private val consumers: MutableMap<String, Consumer> = synchronizedMap(mutableMapOf()) fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) @@ -42,9 +42,9 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, } logger.info { "Received new configuration. Removing old consumers and creating consumers for topics: $topics" } - synchronized(consumerState) { - consumerState.clear() - consumerState.putAll(consumerFactory.createConsumersForTopics(topics)) + synchronized(consumers) { + consumers.clear() + consumers.putAll(consumerFactory.createConsumersFor(topics)) } } @@ -69,7 +69,7 @@ internal class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, fun validate(jsonDescription: InputStream, topic: String) = messageStreamValidation.validate(jsonDescription, currentMessages(topic)) - private fun consumerState(topic: String) = Option.fromNullable(consumerState[topic]) + private fun consumerState(topic: String) = Option.fromNullable(consumers[topic]) private fun currentMessages(topic: String): List<ByteArray> = diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt index 2458b203..992be6e3 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt @@ -143,14 +143,14 @@ internal class DcaeAppApiServer(private val simulator: DcaeAppSimulator) { private val responseValid by lazy { Responses.statusResponse( name = "valid", - message = DcaeAppApiServer.VALID_RESPONSE_MESSAGE + message = VALID_RESPONSE_MESSAGE ) } private val responseInvalid by lazy { Responses.statusResponse( name = "invalid", - message = DcaeAppApiServer.INVALID_RESPONSE_MESSAGE, + message = INVALID_RESPONSE_MESSAGE, httpStatus = HttpStatus.BAD_REQUEST ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt deleted file mode 100644 index a108eba7..00000000 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018,2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import reactor.kafka.receiver.KafkaReceiver -import reactor.kafka.receiver.ReceiverOptions -import reactor.kafka.receiver.ReceiverRecord - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - - fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> = - receiver.receive() - .doOnNext { it.receiverOffset().acknowledge() } - .also { logger.info { "Started Kafka source" } } - - companion object { - private val logger = Logger(KafkaSource::class) - - private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" - private const val USERNAME = "admin" - private const val PASSWORD = "admin_secret" - private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" - private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name - - fun create(bootstrapServers: String, topics: Set<String>) = - KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) - - fun createReceiverOptions(bootstrapServers: String, - topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { - val props = mapOf<String, Any>( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator", - ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000", - - - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, - SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, - SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG - ) - return ReceiverOptions.create<ByteArray, ByteArray>(props) - .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } - .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } } - .subscription(topics) - } - } -} diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 2de89aae..8a7aafbe 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -19,9 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.kafka.receiver.ReceiverRecord import java.util.concurrent.ConcurrentLinkedQueue /** @@ -40,10 +41,9 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr internal interface ConsumerStateProvider { fun currentState(): ConsumerState - fun reset() } -internal class Consumer : ConsumerStateProvider { +internal class Consumer : KafkaConsumer, ConsumerStateProvider { private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() @@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider { override fun reset() = consumedMessages.clear() - fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } consumedMessages.add(record.value()) } @@ -61,20 +61,11 @@ internal class Consumer : ConsumerStateProvider { } } -internal class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> - val topicToConsumer = kafkaTopics.associate { it to Consumer() } - kafkaSource.start() - .map { - val topic = it.topic() - topicToConsumer.get(topic)?.update(it) - ?: logger.warn { "No consumer configured for topic $topic" } - }.subscribe() - topicToConsumer - } +internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) { - companion object { - private val logger = Logger(ConsumerFactory::class) - } + private val consumerProvider = { Consumer() } + + fun createConsumersFor(topics: Set<String>) = + ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider) + .mapValues { it.value as Consumer } } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index 7f4e62bb..25178594 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried -import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer @@ -43,7 +43,7 @@ fun main(args: Array<String>): Unit = private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess { logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" } - val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val consumerFactory = DcaeAppConsumerFactory(config.kafkaBootstrapServers) val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes) val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator()) DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index e3e61c81..4ebfb469 100644 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -46,7 +46,7 @@ import kotlin.test.assertFailsWith * @since August 2018 */ internal class DcaeAppSimulatorTest : Spek({ - lateinit var consumerFactory: ConsumerFactory + lateinit var consumerFactory: DcaeAppConsumerFactory lateinit var messageStreamValidation: MessageStreamValidation lateinit var perf3gpp_consumer: Consumer lateinit var faults_consumer: Consumer @@ -59,7 +59,7 @@ internal class DcaeAppSimulatorTest : Spek({ faults_consumer = mock() cut = DcaeAppSimulator(consumerFactory, messageStreamValidation) - whenever(consumerFactory.createConsumersForTopics(anySet())).thenReturn(mapOf( + whenever(consumerFactory.createConsumersFor(anySet())).thenReturn(mapOf( PERF3GPP_TOPIC to perf3gpp_consumer, FAULTS_TOPICS to faults_consumer)) } @@ -81,12 +81,12 @@ internal class DcaeAppSimulatorTest : Spek({ it("should subscribe to given topics") { cut.listenToTopics(TWO_TOPICS) - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } it("should subscribe to given topics when called with comma separated list") { cut.listenToTopics("$PERF3GPP_TOPIC,$FAULTS_TOPICS") - verify(consumerFactory).createConsumersForTopics(TWO_TOPICS) + verify(consumerFactory).createConsumersFor(TWO_TOPICS) } } diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt deleted file mode 100644 index de74f628..00000000 --- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters - -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since August 2018 - */ -internal class KafkaSourceTest : Spek({ - val servers = "kafka1:9080,kafka2:9080" - val topics = setOf("topic1", "topic2") - - describe("receiver options") { - val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable() - - fun verifyProperty(key: String, expectedValue: Any) { - it("should have $key option set") { - assertThat(options.consumerProperty(key)) - .isEqualTo(expectedValue) - } - } - - verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) - verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator") - verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators") - verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java) - verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - } -})
\ No newline at end of file |