aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 15:58:56 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-20 14:57:25 +0100
commit4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch)
treecff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-core
parent4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff)
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt3
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt7
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt64
15 files changed, 233 insertions, 82 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index ac55e55f..e4a73947 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
interface Metrics {
@@ -41,14 +42,14 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+ operator fun invoke(ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+ override fun invoke(
+ ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2008fc35..fe2b89d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
+
return { ctx: ClientContext ->
- config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
- clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
- protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
- sink = sinkProvider(config, ctx),
- metrics = metrics)
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 5c3f339c..fd01c9d8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
@@ -96,10 +98,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext(metrics::notifyMessageSent)
+ .doOnNext(this::updateSinkMetrics)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
@@ -108,6 +110,15 @@ internal class VesHvCollector(
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
+ private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
+ when (consumedMessage) {
+ is SuccessfullyConsumedMessage ->
+ metrics.notifyMessageSent(consumedMessage.message)
+ is FailedToConsumeMessage ->
+ metrics.notifyMessageDropped(consumedMessage.cause)
+ }
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8c16736d..75b6f0a6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import reactor.netty.http.client.HttpClient
/**
@@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient
* @since May 2018
*/
object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
+ fun sinkCreatorFactory(dummyMode: Boolean,
+ kafkaConfig: KafkaConfiguration): SinkProvider =
+ if (dummyMode)
+ LoggingSinkProvider()
+ else
+ KafkaSinkProvider(kafkaConfig)
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index e4453c90..717da092 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
@@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- val routing = configuration.getJsonArray("collector.routing")
+ val routingArray = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
+ routing {
+ for (route in routingArray) {
val routeObj = route.asJsonObject()
defineRoute {
fromDomain(routeObj.getString("fromDomain"))
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 14966d9b..7535fbee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ override fun invoke(ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b4f9a90c..73c852d6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,19 +20,20 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
private val ctx: ClientContext) : Sink {
- private val sentMessages = AtomicLong(0)
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext {
- if (it.isSuccessful()) {
- Mono.just(it)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.map(::vesToKafkaRecord).let { records ->
+ sender.send(records).map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
} else {
- logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
- Mono.empty<SenderResult<RoutedMessage>>()
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
}
- .map { it.correlationMetadata() }
-
- return result.doOnNext(::logSentMessage)
- }
+ }
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
+ private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
+ SenderRecord.create(
+ routed.topic,
+ routed.partition,
+ FILL_TIMESTAMP_LATER,
+ routed.message.header,
+ routed.message,
+ routed)
- private fun SenderResult<out Any>.isSuccessful() = exception() == null
+ internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
companion object {
- val logger = Logger(KafkaSink::class)
+ private val FILL_TIMESTAMP_LATER: Long? = null
+ private val logger = Logger(KafkaSink::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index b4f470d4..2fa4f545 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,11 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
- }
+internal class KafkaSinkProvider internal constructor(
+ private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
+
+ constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+
+ override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ companion object {
+ private fun constructKafkaSender(config: KafkaConfiguration) =
+ KafkaSender.create(constructSenderOptions(config))
+
+ private fun constructSenderOptions(config: KafkaConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(RETRIES_CONFIG, 1)
+ .producerProperty(ACKS_CONFIG, "1")
+ .stopOnError(false)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
index ec546c7d..f65b97ca 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
+data class CollectorConfiguration(val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
new file mode 100644
index 00000000..f65e157d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class KafkaConfiguration(val bootstrapServers: String)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 85117684..7e5044f9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val serverListenAddress: InetSocketAddress,
+ val kafkaConfiguration: KafkaConfiguration,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
new file mode 100644
index 00000000..29c418a4
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.lang.Exception
+
+sealed class ConsumedMessage {
+ abstract val message: RoutedMessage
+}
+
+data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage()
+
+data class FailedToConsumeMessage(
+ override val message: RoutedMessage,
+ val exception: Exception?,
+ val cause: MessageDropCause) : ConsumedMessage()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index 836eab53..ab7b196a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
*/
enum class MessageDropCause(val tag: String) {
ROUTE_NOT_FOUND("routing"),
- INVALID_MESSAGE("invalid")
+ INVALID_MESSAGE("invalid"),
+ KAFKA_FAILURE("kafka")
}
enum class ClientRejectionCause(val tag: String) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9ce0c3db..a92d3763 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
val route1 = it.routing.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
@@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String,
)
}
-
-const val kafkaAddress = "message-router-kafka"
-
fun constructConsulResponse(): String =
"""{
- "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "whatever": "garbage",
"collector.routing": [
{
"fromDomain": "fault",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
new file mode 100644
index 00000000..3a924e48
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import arrow.syntax.collections.tail
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object KafkaSinkProviderTest : Spek({
+ describe("non functional requirements") {
+ given("sample configuration") {
+ val config = KafkaConfiguration("localhost:9090")
+ val cut = KafkaSinkProvider(config)
+
+ on("sample clients") {
+ val clients = listOf(
+ ClientContext(),
+ ClientContext(),
+ ClientContext(),
+ ClientContext())
+
+ it("should create only one instance of KafkaSender") {
+ val sinks = clients.map(cut::invoke)
+ val firstSink = sinks[0]
+ val restOfSinks = sinks.tail()
+
+ assertThat(restOfSinks).isNotEmpty
+ assertThat(restOfSinks).allSatisfy { sink ->
+ assertThat(firstSink.usesSameSenderAs(sink))
+ .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
+ .isTrue()
+ }
+ }
+ }
+ }
+ }
+})