From 4128aa2c9368ed20fab92e8c0df83f14d6233b86 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 18 Dec 2018 15:58:56 +0100 Subject: There should be one KafkaSender per configuration We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/boundary/adapters.kt | 9 +-- .../collectors/veshv/factory/CollectorFactory.kt | 18 +++--- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 17 +++++- .../veshv/impl/adapters/AdapterFactory.kt | 9 ++- .../impl/adapters/ConsulConfigurationProvider.kt | 8 +-- .../veshv/impl/adapters/LoggingSinkProvider.kt | 12 ++-- .../veshv/impl/adapters/kafka/KafkaSink.kt | 68 ++++++++++------------ .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 38 ++++++++---- .../veshv/model/CollectorConfiguration.kt | 2 +- .../collectors/veshv/model/KafkaConfiguration.kt | 26 +++++++++ .../collectors/veshv/model/ServerConfiguration.kt | 1 + .../veshv/model/SuccessfullyConsumedMessage.kt | 33 +++++++++++ .../veshv/model/stream_interruption_cause.kt | 3 +- .../adapters/ConsulConfigurationProviderTest.kt | 7 +-- .../impl/adapters/kafka/KafkaSinkProviderTest.kt | 64 ++++++++++++++++++++ 15 files changed, 233 insertions(+), 82 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt (limited to 'sources/hv-collector-core') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index ac55e55f..e4a73947 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux): Flux + fun send(messages: Flux): Flux } interface Metrics { @@ -41,14 +42,14 @@ interface Metrics { fun notifyClientRejected(cause: ClientRejectionCause) } -@FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink + operator fun invoke(ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink + override fun invoke( + ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2008fc35..fe2b89d5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider, healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) + return { ctx: ClientContext -> - config.getOption().map { config -> createVesHvCollector(config, ctx) } + config.getOption().map { createVesHvCollector(it, ctx) } } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( - clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), - protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), - sink = sinkProvider(config, ctx), - metrics = metrics) + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 5c3f339c..fd01c9d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure @@ -96,10 +98,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux): Flux = flux + private fun routeMessage(flux: Flux): Flux = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext(metrics::notifyMessageSent) + .doOnNext(this::updateSinkMetrics) private fun findRoute(msg: VesMessage) = router .findDestination(msg) @@ -108,6 +110,15 @@ internal class VesHvCollector( { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) + private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { + when (consumedMessage) { + is SuccessfullyConsumedMessage -> + metrics.notifyMessageSent(consumedMessage.message) + is FailedToConsumeMessage -> + metrics.notifyMessageDropped(consumedMessage.cause) + } + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8c16736d..75b6f0a6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import reactor.netty.http.client.HttpClient /** @@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient * @since May 2018 */ object AdapterFactory { - fun kafkaSink(): SinkProvider = KafkaSinkProvider() - fun loggingSink(): SinkProvider = LoggingSinkProvider() + fun sinkCreatorFactory(dummyMode: Boolean, + kafkaConfig: KafkaConfiguration): SinkProvider = + if (dummyMode) + LoggingSinkProvider() + else + KafkaSinkProvider(kafkaConfig) fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index e4453c90..717da092 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Flux @@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - val routing = configuration.getJsonArray("collector.routing") + val routingArray = configuration.getJsonArray("collector.routing") return CollectorConfiguration( - kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"), - routing = org.onap.dcae.collectors.veshv.model.routing { - for (route in routing) { + routing { + for (route in routingArray) { val routeObj = route.asJsonObject() defineRoute { fromDomain(routeObj.getString("fromDomain")) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 14966d9b..7535fbee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + override fun invoke(ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux): Flux = - messages - .doOnNext(this::logMessage) + override fun send(messages: Flux): Flux = + messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage) private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index b4f9a90c..73c852d6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,19 +20,20 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux -import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord -import reactor.kafka.sender.SenderResult -import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk @@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong */ internal class KafkaSink(private val sender: KafkaSender, private val ctx: ClientContext) : Sink { - private val sentMessages = AtomicLong(0) - override fun send(messages: Flux): Flux { - val records = messages.map(this::vesToKafkaRecord) - val result = sender.send(records) - .doOnNext { - if (it.isSuccessful()) { - Mono.just(it) + override fun send(messages: Flux): Flux = + messages.map(::vesToKafkaRecord).let { records -> + sender.send(records).map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) } else { - logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } - Mono.empty>() + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } } - .map { it.correlationMetadata() } - - return result.doOnNext(::logSentMessage) - } + } - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { - return SenderRecord.create( - msg.topic, - msg.partition, - System.currentTimeMillis(), - msg.message.header, - msg.message, - msg) - } - - private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - val msgNum = sentMessages.incrementAndGet() - "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" - } - } + private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord = + SenderRecord.create( + routed.topic, + routed.partition, + FILL_TIMESTAMP_LATER, + routed.message.header, + routed.message, + routed) - private fun SenderResult.isSuccessful() = exception() == null + internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender companion object { - val logger = Logger(KafkaSink::class) + private val FILL_TIMESTAMP_LATER: Long? = null + private val logger = Logger(KafkaSink::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index b4f470d4..2fa4f545 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,11 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION +import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender @@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions * @author Piotr Jaszczyk * @since June 2018 */ -internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) - } +internal class KafkaSinkProvider internal constructor( + private val kafkaSender: KafkaSender) : SinkProvider { + + constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + + override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create() - .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + companion object { + private fun constructKafkaSender(config: KafkaConfiguration) = + KafkaSender.create(constructSenderOptions(config)) + + private fun constructSenderOptions(config: KafkaConfiguration) = + SenderOptions.create() + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) + .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(RETRIES_CONFIG, 1) + .producerProperty(ACKS_CONFIG, "1") + .stopOnError(false) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt index ec546c7d..f65b97ca 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt @@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model * @author Piotr Jaszczyk * @since May 2018 */ -data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) +data class CollectorConfiguration(val routing: Routing) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt new file mode 100644 index 00000000..f65e157d --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +data class KafkaConfiguration(val bootstrapServers: String) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 85117684..7e5044f9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,6 +29,7 @@ import java.time.Duration */ data class ServerConfiguration( val serverListenAddress: InetSocketAddress, + val kafkaConfiguration: KafkaConfiguration, val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt new file mode 100644 index 00000000..29c418a4 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import java.lang.Exception + +sealed class ConsumedMessage { + abstract val message: RoutedMessage +} + +data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage() + +data class FailedToConsumeMessage( + override val message: RoutedMessage, + val exception: Exception?, + val cause: MessageDropCause) : ConsumedMessage() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt index 836eab53..ab7b196a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt @@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException */ enum class MessageDropCause(val tag: String) { ROUTE_NOT_FOUND("routing"), - INVALID_MESSAGE("invalid") + INVALID_MESSAGE("invalid"), + KAFKA_FAILURE("kafka") } enum class ClientRejectionCause(val tag: String) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 9ce0c3db..a92d3763 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({ StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { - assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) - val route1 = it.routing.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") @@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String, ) } - -const val kafkaAddress = "message-router-kafka" - fun constructConsulResponse(): String = """{ - "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", + "whatever": "garbage", "collector.routing": [ { "fromDomain": "fault", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt new file mode 100644 index 00000000..3a924e48 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import arrow.syntax.collections.tail +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +internal object KafkaSinkProviderTest : Spek({ + describe("non functional requirements") { + given("sample configuration") { + val config = KafkaConfiguration("localhost:9090") + val cut = KafkaSinkProvider(config) + + on("sample clients") { + val clients = listOf( + ClientContext(), + ClientContext(), + ClientContext(), + ClientContext()) + + it("should create only one instance of KafkaSender") { + val sinks = clients.map(cut::invoke) + val firstSink = sinks[0] + val restOfSinks = sinks.tail() + + assertThat(restOfSinks).isNotEmpty + assertThat(restOfSinks).allSatisfy { sink -> + assertThat(firstSink.usesSameSenderAs(sink)) + .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") + .isTrue() + } + } + } + } + } +}) -- cgit 1.2.3-korg