diff options
Diffstat (limited to 'sources/hv-collector-main/src/main')
3 files changed, 7 insertions, 3 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 9b985f6f..ae87f1c2 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE @@ -52,6 +54,7 @@ import java.time.Duration internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { override val cmdLineOptionsList = listOf( + KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, CONSUL_CONFIG_URL, @@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration HEALTH_CHECK_API_PORT, DefaultValues.HEALTH_CHECK_API_PORT ) + val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind() val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, @@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() ServerConfiguration( serverListenAddress = InetSocketAddress(listenPort), + kafkaConfiguration = KafkaConfiguration(kafkaServers), healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), configurationProviderParams = configurationProviderParams, securityConfiguration = security, diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 288145aa..f3bcf381 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor( init { registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { - (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0) + (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0) } registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index f9be546a..4e2e6d86 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -36,10 +36,9 @@ object VesServer : ServerStarter() { override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() private fun createVesServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, + AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() |