aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 15:58:56 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-20 14:57:25 +0100
commit4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch)
treecff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-main/src
parent4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff)
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-main/src')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt5
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt2
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt6
4 files changed, 13 insertions, 3 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 9b985f6f..ae87f1c2 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
@@ -52,6 +54,7 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
+ KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
CONSUL_CONFIG_URL,
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
HEALTH_CHECK_API_PORT,
DefaultValues.HEALTH_CHECK_API_PORT
)
+ val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
serverListenAddress = InetSocketAddress(listenPort),
+ kafkaConfiguration = KafkaConfiguration(kafkaServers),
healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 288145aa..f3bcf381 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -73,7 +73,7 @@ class MicrometerMetrics internal constructor(
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0)
}
registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index f9be546a..4e2e6d86 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -36,10 +36,9 @@ object VesServer : ServerStarter() {
override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
private fun createVesServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
+ AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 1aac6a09..9dddeca9 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -39,6 +39,7 @@ import kotlin.test.assertNotNull
*/
object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
+ val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
@@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess(
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
"--config-url", configurationUrl,
@@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({
)
}
+ it("should set proper kafka bootstrap servers") {
+ assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
+ }
+
it("should set proper listen port") {
assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
}