diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-18 15:58:56 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-20 14:57:25 +0100 |
commit | 4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch) | |
tree | cff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-main/src | |
parent | 4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff) |
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However,
as the configuration might be changed (Consul update) it cannot be a
strict singleton. Hence there should be 1to1 relationship beetween
ConsulConfiguration and KafkaSender.
Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668
Issue-ID: DCAEGEN2-1047
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main/src')
4 files changed, 13 insertions, 3 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 9b985f6f..ae87f1c2 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE @@ -52,6 +54,7 @@ import java.time.Duration internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { override val cmdLineOptionsList = listOf( + KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, CONSUL_CONFIG_URL, @@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration HEALTH_CHECK_API_PORT, DefaultValues.HEALTH_CHECK_API_PORT ) + val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, @@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() ServerConfiguration( serverListenAddress = InetSocketAddress(listenPort), + kafkaConfiguration = KafkaConfiguration(kafkaServers), healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), configurationProviderParams = configurationProviderParams, securityConfiguration = security, diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 288145aa..f3bcf381 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor( init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { - (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0) + (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0) } registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index f9be546a..4e2e6d86 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -36,10 +36,9 @@ object VesServer : ServerStarter() { override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() private fun createVesServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, + AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index 1aac6a09..9dddeca9 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -39,6 +39,7 @@ import kotlin.test.assertNotNull */ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration + val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" @@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({ beforeEachTest { result = cut.parseExpectingSuccess( + "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, "--config-url", configurationUrl, @@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({ ) } + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers) + } + it("should set proper listen port") { assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) } |