aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdevelopment/bin/consul.sh1
-rw-r--r--development/docker-compose.yml6
-rwxr-xr-xdevelopment/start-simulation.sh2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt18
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt9
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt12
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt3
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt7
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt64
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt27
-rw-r--r--sources/hv-collector-main/Dockerfile2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt5
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt6
27 files changed, 297 insertions, 113 deletions
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
index c229f83e..39f0bdef 100755
--- a/development/bin/consul.sh
+++ b/development/bin/consul.sh
@@ -61,7 +61,6 @@ TOPIC=${2:-HV_VES_PERF3GPP}
CONFIGURATION="
{
- \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\",
\"collector.routing\":
[{
\"fromDomain\": \"${DOMAIN}\",
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index a64c62da..adf8947d 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -44,7 +44,6 @@ services:
- consul-server
restart: on-failure
command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
- "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
"collector.routing": [
{
"fromDomain": "perf3gpp",
@@ -63,13 +62,14 @@ services:
ports:
- "6060:6060"
- "6061:6061/tcp"
- entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
- "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
"--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
+ "--kafka-bootstrap-servers", "message-router-kafka:9092",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
+ environment:
+ JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid"
healthcheck:
test: curl -f http://localhost:6060/health/ready || exit 1
interval: 10s
diff --git a/development/start-simulation.sh b/development/start-simulation.sh
index 70e4aaeb..6f38ea7b 100755
--- a/development/start-simulation.sh
+++ b/development/start-simulation.sh
@@ -25,7 +25,7 @@ curl --header 'Content-Type: application/json' --request POST \
"vesEventListenerVersion": "7.2"
},
"messageType": "VALID",
- "messagesAmount": 1
+ "messagesAmount": 1000000
}
]' \
http://localhost:6062/simulator/async
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index ac55e55f..e4a73947 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
}
interface Metrics {
@@ -41,14 +42,14 @@ interface Metrics {
fun notifyClientRejected(cause: ClientRejectionCause)
}
-@FunctionalInterface
interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+ operator fun invoke(ctx: ClientContext): Sink
companion object {
fun just(sink: Sink): SinkProvider =
object : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+ override fun invoke(
+ ctx: ClientContext): Sink = sink
}
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2008fc35..fe2b89d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
+
return { ctx: ClientContext ->
- config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
}
}
- private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
- clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
- protobufDecoder = VesDecoder(),
- router = Router(config.routing, ctx),
- sink = sinkProvider(config, ctx),
- metrics = metrics)
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+ VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 5c3f339c..fd01c9d8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
@@ -96,10 +98,10 @@ internal class VesHvCollector(
.doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
}
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext(metrics::notifyMessageSent)
+ .doOnNext(this::updateSinkMetrics)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
@@ -108,6 +110,15 @@ internal class VesHvCollector(
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
+ private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
+ when (consumedMessage) {
+ is SuccessfullyConsumedMessage ->
+ metrics.notifyMessageSent(consumedMessage.message)
+ is FailedToConsumeMessage ->
+ metrics.notifyMessageDropped(consumedMessage.cause)
+ }
+ }
+
private fun releaseBuffersMemory() = wireChunkDecoder.release()
.also { logger.debug { "Released buffer memory after handling message stream" } }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8c16736d..75b6f0a6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import reactor.netty.http.client.HttpClient
/**
@@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient
* @since May 2018
*/
object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
+ fun sinkCreatorFactory(dummyMode: Boolean,
+ kafkaConfig: KafkaConfiguration): SinkProvider =
+ if (dummyMode)
+ LoggingSinkProvider()
+ else
+ KafkaSinkProvider(kafkaConfig)
fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index e4453c90..717da092 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
@@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- val routing = configuration.getJsonArray("collector.routing")
+ val routingArray = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
+ routing {
+ for (route in routingArray) {
val routeObj = route.asJsonObject()
defineRoute {
fromDomain(routeObj.getString("fromDomain"))
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 14966d9b..7535fbee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+ override fun invoke(ctx: ClientContext): Sink {
return object : Sink {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b4f9a90c..73c852d6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,19 +20,20 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
private val ctx: ClientContext) : Sink {
- private val sentMessages = AtomicLong(0)
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext {
- if (it.isSuccessful()) {
- Mono.just(it)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.map(::vesToKafkaRecord).let { records ->
+ sender.send(records).map {
+ val msg = it.correlationMetadata()
+ if (it.exception() == null) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
+ "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+ }
+ SuccessfullyConsumedMessage(msg)
} else {
- logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
- Mono.empty<SenderResult<RoutedMessage>>()
+ logger.warn(ctx::fullMdc, Marker.Invoke()) {
+ "Failed to send message to Kafka. Reason: ${it.exception().message}"
+ }
+ logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+ FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
}
}
- .map { it.correlationMetadata() }
-
- return result.doOnNext(::logSentMessage)
- }
+ }
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::fullMdc, Marker.Invoke()) {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
+ private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
+ SenderRecord.create(
+ routed.topic,
+ routed.partition,
+ FILL_TIMESTAMP_LATER,
+ routed.message.header,
+ routed.message,
+ routed)
- private fun SenderResult<out Any>.isSuccessful() = exception() == null
+ internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
companion object {
- val logger = Logger(KafkaSink::class)
+ private val FILL_TIMESTAMP_LATER: Long? = null
+ private val logger = Logger(KafkaSink::class)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index b4f470d4..2fa4f545 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,11 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
@@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
- }
+internal class KafkaSinkProvider internal constructor(
+ private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
+
+ constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+
+ override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ companion object {
+ private fun constructKafkaSender(config: KafkaConfiguration) =
+ KafkaSender.create(constructSenderOptions(config))
+
+ private fun constructSenderOptions(config: KafkaConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+ .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+ .producerProperty(RETRIES_CONFIG, 1)
+ .producerProperty(ACKS_CONFIG, "1")
+ .stopOnError(false)
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
index ec546c7d..f65b97ca 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
+data class CollectorConfiguration(val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
new file mode 100644
index 00000000..f65e157d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class KafkaConfiguration(val bootstrapServers: String)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 85117684..7e5044f9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val serverListenAddress: InetSocketAddress,
+ val kafkaConfiguration: KafkaConfiguration,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
new file mode 100644
index 00000000..29c418a4
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.lang.Exception
+
+sealed class ConsumedMessage {
+ abstract val message: RoutedMessage
+}
+
+data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage()
+
+data class FailedToConsumeMessage(
+ override val message: RoutedMessage,
+ val exception: Exception?,
+ val cause: MessageDropCause) : ConsumedMessage()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index 836eab53..ab7b196a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException
*/
enum class MessageDropCause(val tag: String) {
ROUTE_NOT_FOUND("routing"),
- INVALID_MESSAGE("invalid")
+ INVALID_MESSAGE("invalid"),
+ KAFKA_FAILURE("kafka")
}
enum class ClientRejectionCause(val tag: String) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9ce0c3db..a92d3763 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
val route1 = it.routing.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
@@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String,
)
}
-
-const val kafkaAddress = "message-router-kafka"
-
fun constructConsulResponse(): String =
"""{
- "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "whatever": "garbage",
"collector.routing": [
{
"fromDomain": "fault",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
new file mode 100644
index 00000000..3a924e48
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import arrow.syntax.collections.tail
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object KafkaSinkProviderTest : Spek({
+ describe("non functional requirements") {
+ given("sample configuration") {
+ val config = KafkaConfiguration("localhost:9090")
+ val cut = KafkaSinkProvider(config)
+
+ on("sample clients") {
+ val clients = listOf(
+ ClientContext(),
+ ClientContext(),
+ ClientContext(),
+ ClientContext())
+
+ it("should create only one instance of KafkaSender") {
+ val sinks = clients.map(cut::invoke)
+ val firstSink = sinks[0]
+ val restOfSinks = sinks.tail()
+
+ assertThat(restOfSinks).isNotEmpty
+ assertThat(restOfSinks).allSatisfy { sink ->
+ assertThat(firstSink.usesSameSenderAs(sink))
+ .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
+ .isTrue()
+ }
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index f457aeaf..aaa3ee3b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
@@ -50,7 +51,7 @@ object MetricsSpecification : Spek({
describe("Bytes received metrics") {
it("should sum up all bytes received") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val vesWireFrameMessage = vesWireFrameMessage()
val invalidWireFrame = messageWithInvalidWireFrameHeader()
@@ -70,7 +71,7 @@ object MetricsSpecification : Spek({
describe("Messages received metrics") {
it("should sum up all received messages bytes") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
val firstVesMessage = vesWireFrameMessage(firstVesEvent)
@@ -91,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -129,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -145,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -158,8 +159,19 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
+ it("should gather metrics for sing errors") {
+ val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+
+ sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(KAFKA_FAILURE))
+ .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
+ .isEqualTo(1)
+ }
+
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -183,7 +195,7 @@ object MetricsSpecification : Spek({
).forEach { cause, vesMessage ->
on("cause $cause") {
it("should notify correct metrics") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
sut.handleConnection(vesMessage)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 7ebbfba0..c3e4a581 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(NoOpSink()).apply {
+fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysSuccessfulSink()).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
+
+fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysFailingSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ Sut(DelayingSink(delay)).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 3770913a..db56e88c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
)
val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
}.build()
)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 2f731f53..b4ce6499 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes
import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,8 +43,8 @@ class StoringSink : Sink {
val sentMessages: List<RoutedMessage>
get() = sent.toList()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext(sent::addLast)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
+ return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
}
@@ -54,16 +58,23 @@ class CountingSink : Sink {
val count: Long
get() = atomicCount.get()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext {
atomicCount.incrementAndGet()
- }
+ }.map(::SuccessfullyConsumedMessage)
}
}
-open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
+open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.transform(transformer)
}
-class NoOpSink : ProcessingSink(::identity)
+class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) })
+
+class AlwaysFailingSink : ProcessingSink({ stream ->
+ stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) }
+})
+
+class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index ad7a03d6..3322059c 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -11,7 +11,7 @@ RUN apt-get update \
WORKDIR /opt/ves-hv-collector
-ENTRYPOINT ["entry.sh"]
+ENTRYPOINT ["./entry.sh"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 9b985f6f..ae87f1c2 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
@@ -52,6 +54,7 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
+ KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
CONSUL_CONFIG_URL,
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
HEALTH_CHECK_API_PORT,
DefaultValues.HEALTH_CHECK_API_PORT
)
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers),
healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 288145aa..f3bcf381 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor(
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0)
}
registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index f9be546a..4e2e6d86 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -36,10 +36,9 @@ object VesServer : ServerStarter() {
override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
private fun createVesServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
+ AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 1aac6a09..9dddeca9 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -39,6 +39,7 @@ import kotlin.test.assertNotNull
*/
object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
+ val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
@@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
"--config-url", configurationUrl,
@@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({
)
}
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
it("should set proper listen port") {
assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
}