From 4128aa2c9368ed20fab92e8c0df83f14d6233b86 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 18 Dec 2018 15:58:56 +0100 Subject: There should be one KafkaSender per configuration We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk --- sources/hv-collector-main/Dockerfile | 2 +- .../org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt | 5 +++++ .../onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt | 2 +- .../kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt | 3 +-- .../onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt | 6 ++++++ 5 files changed, 14 insertions(+), 4 deletions(-) (limited to 'sources/hv-collector-main') diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile index ad7a03d6..3322059c 100644 --- a/sources/hv-collector-main/Dockerfile +++ b/sources/hv-collector-main/Dockerfile @@ -11,7 +11,7 @@ RUN apt-get update \ WORKDIR /opt/ves-hv-collector -ENTRYPOINT ["entry.sh"] +ENTRYPOINT ["./entry.sh"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 9b985f6f..ae87f1c2 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -27,10 +27,12 @@ import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE @@ -52,6 +54,7 @@ import java.time.Duration internal class ArgVesHvConfiguration : ArgBasedConfiguration(DefaultParser()) { override val cmdLineOptionsList = listOf( + KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, CONSUL_CONFIG_URL, @@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration = createVesServer(config).start() private fun createVesServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, + AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index 1aac6a09..9dddeca9 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -39,6 +39,7 @@ import kotlin.test.assertNotNull */ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration + val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" @@ -57,6 +58,7 @@ object ArgVesHvConfigurationTest : Spek({ beforeEachTest { result = cut.parseExpectingSuccess( + "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, "--config-url", configurationUrl, @@ -69,6 +71,10 @@ object ArgVesHvConfigurationTest : Spek({ ) } + it("should set proper kafka bootstrap servers") { + assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers) + } + it("should set proper listen port") { assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) } -- cgit 1.2.3-korg