diff options
27 files changed, 297 insertions, 113 deletions
diff --git a/development/bin/consul.sh b/development/bin/consul.sh index c229f83e..39f0bdef 100755 --- a/development/bin/consul.sh +++ b/development/bin/consul.sh @@ -61,7 +61,6 @@ TOPIC=${2:-HV_VES_PERF3GPP} CONFIGURATION=" { - \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\", \"collector.routing\": [{ \"fromDomain\": \"${DOMAIN}\", diff --git a/development/docker-compose.yml b/development/docker-compose.yml index a64c62da..adf8947d 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -44,7 +44,6 @@ services: - consul-server restart: on-failure command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{ - "dmaap.kafkaBootstrapServers": "message-router-kafka:9092", "collector.routing": [ { "fromDomain": "perf3gpp", @@ -63,13 +62,14 @@ services: ports: - "6060:6060" - "6061:6061/tcp" - entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", - "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] command: ["--listen-port", "6061", "--health-check-api-port", "6060", "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true", + "--kafka-bootstrap-servers", "message-router-kafka:9092", "--key-store-password", "onaponap", "--trust-store-password", "onaponap"] + environment: + JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid" healthcheck: test: curl -f http://localhost:6060/health/ready || exit 1 interval: 10s diff --git a/development/start-simulation.sh b/development/start-simulation.sh index 70e4aaeb..6f38ea7b 100755 --- a/development/start-simulation.sh +++ b/development/start-simulation.sh @@ -25,7 +25,7 @@ curl --header 'Content-Type: application/json' --request POST \ "vesEventListenerVersion": "7.2" }, "messageType": "VALID", - "messagesAmount": 1 + "messagesAmount": 1000000 } ]' \ http://localhost:6062/simulator/async diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index ac55e55f..e4a73947 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> + fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> } interface Metrics { @@ -41,14 +42,14 @@ interface Metrics { fun notifyClientRejected(cause: ClientRejectionCause) } -@FunctionalInterface interface SinkProvider { - operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink + operator fun invoke(ctx: ClientContext): Sink companion object { fun just(sink: Sink): SinkProvider = object : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink + override fun invoke( + ctx: ClientContext): Sink = sink } } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2008fc35..fe2b89d5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -59,18 +59,20 @@ class CollectorFactory(val configuration: ConfigurationProvider, healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) + return { ctx: ClientContext -> - config.getOption().map { config -> createVesHvCollector(config, ctx) } + config.getOption().map { createVesHvCollector(it, ctx) } } } - private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector( - clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), - protobufDecoder = VesDecoder(), - router = Router(config.routing, ctx), - sink = sinkProvider(config, ctx), - metrics = metrics) + private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = + VesHvCollector( + clientContext = ctx, + wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx), + protobufDecoder = VesDecoder(), + router = Router(config.routing, ctx), + sink = sinkProvider(ctx), + metrics = metrics) companion object { private val logger = Logger(CollectorFactory::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 5c3f339c..fd01c9d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -28,9 +28,11 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleR import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure @@ -96,10 +98,10 @@ internal class VesHvCollector( .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) } } - private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux + private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext(metrics::notifyMessageSent) + .doOnNext(this::updateSinkMetrics) private fun findRoute(msg: VesMessage) = router .findDestination(msg) @@ -108,6 +110,15 @@ internal class VesHvCollector( { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) + private fun updateSinkMetrics(consumedMessage: ConsumedMessage) { + when (consumedMessage) { + is SuccessfullyConsumedMessage -> + metrics.notifyMessageSent(consumedMessage.message) + is FailedToConsumeMessage -> + metrics.notifyMessageDropped(consumedMessage.cause) + } + } + private fun releaseBuffersMemory() = wireChunkDecoder.release() .also { logger.debug { "Released buffer memory after handling message stream" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8c16736d..75b6f0a6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import reactor.netty.http.client.HttpClient /** @@ -30,8 +31,12 @@ import reactor.netty.http.client.HttpClient * @since May 2018 */ object AdapterFactory { - fun kafkaSink(): SinkProvider = KafkaSinkProvider() - fun loggingSink(): SinkProvider = LoggingSinkProvider() + fun sinkCreatorFactory(dummyMode: Boolean, + kafkaConfig: KafkaConfiguration): SinkProvider = + if (dummyMode) + LoggingSinkProvider() + else + KafkaSinkProvider(kafkaConfig) fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index e4453c90..717da092 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Flux @@ -107,12 +108,11 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - val routing = configuration.getJsonArray("collector.routing") + val routingArray = configuration.getJsonArray("collector.routing") return CollectorConfiguration( - kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"), - routing = org.onap.dcae.collectors.veshv.model.routing { - for (route in routing) { + routing { + for (route in routingArray) { val routeObj = route.asJsonObject() defineRoute { fromDomain(routeObj.getString("fromDomain")) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 14966d9b..7535fbee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -21,11 +21,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,14 +37,13 @@ import java.util.concurrent.atomic.AtomicLong */ internal class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { + override fun invoke(ctx: ClientContext): Sink { return object : Sink { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = - messages - .doOnNext(this::logMessage) + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = + messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage) private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index b4f9a90c..73c852d6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,19 +20,20 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux -import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord -import reactor.kafka.sender.SenderResult -import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -40,44 +41,39 @@ import java.util.concurrent.atomic.AtomicLong */ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink { - private val sentMessages = AtomicLong(0) - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - val records = messages.map(this::vesToKafkaRecord) - val result = sender.send(records) - .doOnNext { - if (it.isSuccessful()) { - Mono.just(it) + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = + messages.map(::vesToKafkaRecord).let { records -> + sender.send(records).map { + val msg = it.correlationMetadata() + if (it.exception() == null) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { + "Message sent to Kafka with metadata: ${it.recordMetadata()}" + } + SuccessfullyConsumedMessage(msg) } else { - logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) } - Mono.empty<SenderResult<RoutedMessage>>() + logger.warn(ctx::fullMdc, Marker.Invoke()) { + "Failed to send message to Kafka. Reason: ${it.exception().message}" + } + logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) } + FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE) } } - .map { it.correlationMetadata() } - - return result.doOnNext(::logSentMessage) - } + } - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { - return SenderRecord.create( - msg.topic, - msg.partition, - System.currentTimeMillis(), - msg.message.header, - msg.message, - msg) - } - - private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace(ctx::fullMdc, Marker.Invoke()) { - val msgNum = sentMessages.incrementAndGet() - "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" - } - } + private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> = + SenderRecord.create( + routed.topic, + routed.partition, + FILL_TIMESTAMP_LATER, + routed.message.header, + routed.message, + routed) - private fun SenderResult<out Any>.isSuccessful() = exception() == null + internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender companion object { - val logger = Logger(KafkaSink::class) + private val FILL_TIMESTAMP_LATER: Long? = null + private val logger = Logger(KafkaSink::class) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index b4f470d4..2fa4f545 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -19,11 +19,16 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters.kafka -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION +import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender @@ -33,14 +38,25 @@ import reactor.kafka.sender.SenderOptions * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx) - } +internal class KafkaSinkProvider internal constructor( + private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider { + + constructor(config: KafkaConfiguration) : this(constructKafkaSender(config)) + + override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx) - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + companion object { + private fun constructKafkaSender(config: KafkaConfiguration) = + KafkaSender.create(constructSenderOptions(config)) + + private fun constructSenderOptions(config: KafkaConfiguration) = + SenderOptions.create<CommonEventHeader, VesMessage>() + .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) + .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) + .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) + .producerProperty(RETRIES_CONFIG, 1) + .producerProperty(ACKS_CONFIG, "1") + .stopOnError(false) + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt index ec546c7d..f65b97ca 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt @@ -23,4 +23,4 @@ package org.onap.dcae.collectors.veshv.model * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) +data class CollectorConfiguration(val routing: Routing) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt new file mode 100644 index 00000000..f65e157d --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +data class KafkaConfiguration(val bootstrapServers: String) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 85117684..7e5044f9 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,6 +29,7 @@ import java.time.Duration */ data class ServerConfiguration( val serverListenAddress: InetSocketAddress, + val kafkaConfiguration: KafkaConfiguration, val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt new file mode 100644 index 00000000..29c418a4 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import java.lang.Exception + +sealed class ConsumedMessage { + abstract val message: RoutedMessage +} + +data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage() + +data class FailedToConsumeMessage( + override val message: RoutedMessage, + val exception: Exception?, + val cause: MessageDropCause) : ConsumedMessage() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt index 836eab53..ab7b196a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt @@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireFrameException */ enum class MessageDropCause(val tag: String) { ROUTE_NOT_FOUND("routing"), - INVALID_MESSAGE("invalid") + INVALID_MESSAGE("invalid"), + KAFKA_FAILURE("kafka") } enum class ClientRejectionCause(val tag: String) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 9ce0c3db..a92d3763 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({ StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { - assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) - val route1 = it.routing.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") @@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String, ) } - -const val kafkaAddress = "message-router-kafka" - fun constructConsulResponse(): String = """{ - "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", + "whatever": "garbage", "collector.routing": [ { "fromDomain": "fault", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt new file mode 100644 index 00000000..3a924e48 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import arrow.syntax.collections.tail +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +internal object KafkaSinkProviderTest : Spek({ + describe("non functional requirements") { + given("sample configuration") { + val config = KafkaConfiguration("localhost:9090") + val cut = KafkaSinkProvider(config) + + on("sample clients") { + val clients = listOf( + ClientContext(), + ClientContext(), + ClientContext(), + ClientContext()) + + it("should create only one instance of KafkaSender") { + val sinks = clients.map(cut::invoke) + val firstSink = sinks[0] + val restOfSinks = sinks.tail() + + assertThat(restOfSinks).isNotEmpty + assertThat(restOfSinks).allSatisfy { sink -> + assertThat(firstSink.usesSameSenderAs(sink)) + .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") + .isTrue() + } + } + } + } + } +}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index f457aeaf..aaa3ee3b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE +import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC @@ -50,7 +51,7 @@ object MetricsSpecification : Spek({ describe("Bytes received metrics") { it("should sum up all bytes received") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() val vesWireFrameMessage = vesWireFrameMessage() val invalidWireFrame = messageWithInvalidWireFrameHeader() @@ -70,7 +71,7 @@ object MetricsSpecification : Spek({ describe("Messages received metrics") { it("should sum up all received messages bytes") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10))) val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40))) val firstVesMessage = vesWireFrameMessage(firstVesEvent) @@ -91,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -129,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -145,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -158,8 +159,19 @@ object MetricsSpecification : Spek({ .isEqualTo(1) } + it("should gather metrics for sing errors") { + val sut = vesHvWithAlwaysFailingSink(basicConfiguration) + + sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) + + val metrics = sut.metrics + assertThat(metrics.messagesDropped(KAFKA_FAILURE)) + .describedAs("messagesDroppedCause $KAFKA_FAILURE metric") + .isEqualTo(1) + } + it("should gather summed metrics for dropped messages") { - val sut = vesHvWithNoOpSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -183,7 +195,7 @@ object MetricsSpecification : Spek({ ).forEach { cause, vesMessage -> on("cause $cause") { it("should notify correct metrics") { - val sut = vesHvWithNoOpSink() + val sut = vesHvWithAlwaysSuccessfulSink() sut.handleConnection(vesMessage) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 7ebbfba0..c3e4a581 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = - Sut(NoOpSink()).apply { +fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(AlwaysSuccessfulSink()).apply { + configurationProvider.updateConfiguration(collectorConfiguration) + } + +fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = + Sut(AlwaysFailingSink()).apply { configurationProvider.updateConfiguration(collectorConfiguration) } fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = - Sut(ProcessingSink { it.delayElements(delay) }).apply { + Sut(DelayingSink(delay)).apply { configurationProvider.updateConfiguration(collectorConfiguration) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index 3770913a..db56e88c 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration( ) val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { fromDomain(PERF3GPP.domainName) @@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", routing = routing { }.build() ) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 2f731f53..b4ce6499 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.ConsumedMessage +import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage +import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicLong -import java.util.function.Function /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -39,8 +43,8 @@ class StoringSink : Sink { val sentMessages: List<RoutedMessage> get() = sent.toList() - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - return messages.doOnNext(sent::addLast) + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { + return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } } @@ -54,16 +58,23 @@ class CountingSink : Sink { val count: Long get() = atomicCount.get() - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { return messages.doOnNext { atomicCount.incrementAndGet() - } + }.map(::SuccessfullyConsumedMessage) } } -open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink { - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer) +open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink { + override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> = + messages.transform(transformer) } -class NoOpSink : ProcessingSink(::identity) +class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) }) + +class AlwaysFailingSink : ProcessingSink({ stream -> + stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) } +}) + +class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) }) diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile index ad7a03d6..3322059c 100644 --- a/sources/hv-collector-main/Dockerfile +++ b/sources/hv-collector-main/Dockerfile @@ -11,7 +11,7 @@ RUN apt-get update \ WORKDIR /opt/ves-hv-collector -ENTRYPOINT ["entry.sh"] +ENTRYPOINT ["./entry.sh"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 9b985f6f..ae87f1c2 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE @@ -52,6 +54,7 @@ import java.time.Duration internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { override val cmdLineOptionsList = listOf( + KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, CONSUL_CONFIG_URL, @@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration HEALTH_CHECK_API_PORT, DefaultValues.HEALTH_CHECK_API_PORT ) + val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, @@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() ServerConfiguration( serverListenAddress = InetSocketAddress(listenPort), + kafkaConfiguration = KafkaConfiguration(kafkaServers), healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), configurationProviderParams = configurationProviderParams, securityConfiguration = security, diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 288145aa..f3bcf381 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor( init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { - (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0) + (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0) } registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index f9be546a..4e2e6d86 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -36,10 +36,9 @@ object VesServer : ServerStarter() { override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() private fun createVesServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, + AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index 1aac6a09..9dddeca9 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -39,6 +39,7 @@ import kotlin.test.assertNotNull */ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration + val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" @@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({ beforeEachTest { result = cut.parseExpectingSuccess( + "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, "--config-url", configurationUrl, @@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({ ) } + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers) + } + it("should set proper listen port") { assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) } |