aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 15:58:56 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-20 14:57:25 +0100
commit4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch)
treecff4cf2428a288b7b86830f282b81d41a41ad250
parent4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff)
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rwxr-xr-xdevelopment/bin/consul.sh1
-rw-r--r--development/docker-compose.yml6
-rwxr-xr-xdevelopment/start-simulation.sh2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt3
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt7
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt64
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt27
-rw-r--r--sources/hv-collector-main/Dockerfile2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt5
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt6
27 files changed, 297 insertions, 113 deletions
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
index c229f83e..39f0bdef 100755
--- a/development/bin/consul.sh
+++ b/development/bin/consul.sh
@@ -61,7 +61,6 @@ TOPIC=${2:-HV_VES_PERF3GPP}
CONFIGURATION="
{
- \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\",
\"collector.routing\":
[{
\"fromDomain\": \"${DOMAIN}\",
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index a64c62da..adf8947d 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -44,7 +44,6 @@ services:
- consul-server
restart: on-failure
command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
- "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
"collector.routing": [
{
"fromDomain": "perf3gpp",
@@ -63,13 +62,14 @@ services:
ports:
- "6060:6060"
- "6061:6061/tcp"
- entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
- "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
"--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
+ "--kafka-bootstrap-servers", "message-router-kafka:9092",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
+ environment:
+ JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid"
healthcheck:
test: curl -f http://localhost:6060/health/ready || exit 1
interval: 10s
diff --git a/development/start-simulation.sh b/development/start-simulation.sh
index 70e4aaeb..6f38ea7b 100755
--- a/development/start-simulation.sh
+++ b/development/start-simulation.sh
@@ -25,7 +25,7 @@ curl --header 'Content-Type: application/json' --request POST \
"vesEventListenerVersion": "7.2"
},
"messageType": "VALID",
- "messagesAmount": 1
+ "messagesAmount": 1000000
}
]' \
http://localhost:6062/simulator/async
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index ac55e55f..e4a73947 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
interface Metrics {
@@ -41,14 +42,14 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+ operator fun invoke(ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+ override fun invoke(
+ ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2008fc35..fe2b89d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
+
return { ctx: ClientContext ->
- config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
- clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
- protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
- sink = sinkProvider(config, ctx),
- metrics = metrics)
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 5c3f339c..fd01c9d8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
@@ -96,10 +98,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext(metrics::notifyMessageSent)
+ .doOnNext(this::updateSinkMetrics)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
@@ -108,6 +110,15 @@ internal class VesHvCollector(
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
+ private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
+ when (consumedMessage) {
+ is SuccessfullyConsumedMessage ->
+ metrics.notifyMessageSent(consumedMessage.message)
+ is FailedToConsumeMessage ->
+ metrics.notifyMessageDropped(consumedMessage.cause)
+ }
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8c16736d..75b6f0a6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import reactor.netty.http.client.HttpClient
/**
@@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient
* @since May 2018
*/
object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
+ fun sinkCreatorFactory(dummyMode: Boolean,
+ kafkaConfig: KafkaConfiguration): SinkProvider =
+ if (dummyMode)
+ LoggingSinkProvider()
+ else
+ KafkaSinkProvider(kafkaConfig)
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index e4453c90..717da092 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
@@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- val routing = configuration.getJsonArray("collector.routing")
+ val routingArray = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
+ routing {
+ for (route in routingArray) {
val routeObj = route.asJsonObject()
defineRoute {
fromDomain(routeObj.getString("fromDomain"))
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 14966d9b..7535fbee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ override fun invoke(ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b4f9a90c..73c852d6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,19 +20,20 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
private val ctx: ClientContext) : Sink {
- private val sentMessages = AtomicLong(0)
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext {
- if (it.isSuccessful()) {
- Mono.just(it)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.map(::vesToKafkaRecord).let { records ->
+ sender.send(records).map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
} else {
- logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
- Mono.empty<SenderResult<RoutedMessage>>()
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
}
- .map { it.correlationMetadata() }
-
- return result.doOnNext(::logSentMessage)
- }
+ }
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
+ private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
+ SenderRecord.create(
+ routed.topic,
+ routed.partition,
+ FILL_TIMESTAMP_LATER,
+ routed.message.header,
+ routed.message,
+ routed)
- private fun SenderResult<out Any>.isSuccessful() = exception() == null
+ internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
companion object {
- val logger = Logger(KafkaSink::class)
+ private val FILL_TIMESTAMP_LATER: Long? = null
+ private val logger = Logger(KafkaSink::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index b4f470d4..2fa4f545 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,11 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
- }
+internal class KafkaSinkProvider internal constructor(
+ private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
+
+ constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+
+ override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ companion object {
+ private fun constructKafkaSender(config: KafkaConfiguration) =
+ KafkaSender.create(constructSenderOptions(config))
+
+ private fun constructSenderOptions(config: KafkaConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(RETRIES_CONFIG, 1)
+ .producerProperty(ACKS_CONFIG, "1")
+ .stopOnError(false)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
index ec546c7d..f65b97ca 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
+data class CollectorConfiguration(val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
new file mode 100644
index 00000000..f65e157d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class KafkaConfiguration(val bootstrapServers: String)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 85117684..7e5044f9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val serverListenAddress: InetSocketAddress,
+ val kafkaConfiguration: KafkaConfiguration,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
new file mode 100644
index 00000000..29c418a4
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.lang.Exception
+
+sealed class ConsumedMessage {
+ abstract val message: RoutedMessage
+}
+
+data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage()
+
+data class FailedToConsumeMessage(
+ override val message: RoutedMessage,
+ val exception: Exception?,
+ val cause: MessageDropCause) : ConsumedMessage()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index 836eab53..ab7b196a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
*/
enum class MessageDropCause(val tag: String) {
ROUTE_NOT_FOUND("routing"),
- INVALID_MESSAGE("invalid")
+ INVALID_MESSAGE("invalid"),
+ KAFKA_FAILURE("kafka")
}
enum class ClientRejectionCause(val tag: String) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9ce0c3db..a92d3763 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
val route1 = it.routing.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
@@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String,
)
}
-
-const val kafkaAddress = "message-router-kafka"
-
fun constructConsulResponse(): String =
"""{
- "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "whatever": "garbage",
"collector.routing": [
{
"fromDomain": "fault",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
new file mode 100644
index 00000000..3a924e48
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import arrow.syntax.collections.tail
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object KafkaSinkProviderTest : Spek({
+ describe("non functional requirements") {
+ given("sample configuration") {
+ val config = KafkaConfiguration("localhost:9090")
+ val cut = KafkaSinkProvider(config)
+
+ on("sample clients") {
+ val clients = listOf(
+ ClientContext(),
+ ClientContext(),
+ ClientContext(),
+ ClientContext())
+
+ it("should create only one instance of KafkaSender") {
+ val sinks = clients.map(cut::invoke)
+ val firstSink = sinks[0]
+ val restOfSinks = sinks.tail()
+
+ assertThat(restOfSinks).isNotEmpty
+ assertThat(restOfSinks).allSatisfy { sink ->
+ assertThat(firstSink.usesSameSenderAs(sink))
+ .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
+ .isTrue()
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index f457aeaf..aaa3ee3b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
@@ -50,7 +51,7 @@ object MetricsSpecification : Spek({
describe("Bytes received metrics") {
it("should sum up all bytes received") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val vesWireFrameMessage = vesWireFrameMessage()
val invalidWireFrame = messageWithInvalidWireFrameHeader()
@@ -70,7 +71,7 @@ object MetricsSpecification : Spek({
describe("Messages received metrics") {
it("should sum up all received messages bytes") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
val firstVesMessage = vesWireFrameMessage(firstVesEvent)
@@ -91,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -129,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -145,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -158,8 +159,19 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
+ it("should gather metrics for sing errors") {
+ val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+
+ sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(KAFKA_FAILURE))
+ .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
+ .isEqualTo(1)
+ }
+
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -183,7 +195,7 @@ object MetricsSpecification : Spek({
).forEach { cause, vesMessage ->
on("cause $cause") {
it("should notify correct metrics") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
sut.handleConnection(vesMessage)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 7ebbfba0..c3e4a581 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(NoOpSink()).apply {
+fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysSuccessfulSink()).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
+
+fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysFailingSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ Sut(DelayingSink(delay)).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 3770913a..db56e88c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
)
val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
}.build()
)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 2f731f53..b4ce6499 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes
import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,8 +43,8 @@ class StoringSink : Sink {
val sentMessages: List<RoutedMessage>
get() = sent.toList()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext(sent::addLast)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
+ return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
}
@@ -54,16 +58,23 @@ class CountingSink : Sink {
val count: Long
get() = atomicCount.get()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext {
atomicCount.incrementAndGet()
- }
+ }.map(::SuccessfullyConsumedMessage)
}
}
-open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
+open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.transform(transformer)
}
-class NoOpSink : ProcessingSink(::identity)
+class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) })
+
+class AlwaysFailingSink : ProcessingSink({ stream ->
+ stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) }
+})
+
+class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index ad7a03d6..3322059c 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -11,7 +11,7 @@ RUN apt-get update \
WORKDIR /opt/ves-hv-collector
-ENTRYPOINT ["entry.sh"]
+ENTRYPOINT ["./entry.sh"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 9b985f6f..ae87f1c2 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
@@ -52,6 +54,7 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
+ KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
CONSUL_CONFIG_URL,
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
HEALTH_CHECK_API_PORT,
DefaultValues.HEALTH_CHECK_API_PORT
)
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers),
healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 288145aa..f3bcf381 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor(
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0)
}
registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index f9be546a..4e2e6d86 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -36,10 +36,9 @@ object VesServer : ServerStarter() {
override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
private fun createVesServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
+ AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 1aac6a09..9dddeca9 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -39,6 +39,7 @@ import kotlin.test.assertNotNull
*/
object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
+ val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
@@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
"--config-url", configurationUrl,
@@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({
)
}
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
it("should set proper listen port") {
assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
}