diff options
14 files changed, 184 insertions, 97 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 65951edc..d98971ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,14 @@ services: depends_on: - zookeeper + consul: + image: progrium/consul + ports: + - "8500:8500" + environment: + - CONSUL_BIND_INTERFACE=eth0 + command: ["-server", "-bootstrap", "-ui-dir", "/ui"] + ves-hv-collector: image: onap/ves-hv-collector # build: @@ -29,6 +37,7 @@ services: - "6061:6061/tcp" depends_on: - kafka + - consul volumes: - ./ssl/:/etc/ves-hv/ diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 8785180b..06047fd4 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -49,7 +49,9 @@ class CollectorFactory(val configuration: ConfigurationProvider, } private fun createVesHvCollector(): Flux<Collector> = - configuration().map(this::createVesHvCollector) + configuration() + .doOnError { System.exit(ERROR_CODE) } + .map(this::createVesHvCollector) private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( @@ -61,5 +63,8 @@ class CollectorFactory(val configuration: ConfigurationProvider, metrics = metrics) } + companion object { + const val ERROR_CODE = 3 + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 358be108..d9e7432d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -34,13 +33,8 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun staticConfigurationProvider(config: CollectorConfiguration) = - object : ConfigurationProvider { - override fun invoke() = Flux.just(config) - } - - fun consulConfigurationProvider(url: String): ConfigurationProvider = - ConsulConfigurationProvider(url, httpAdapter()) + fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider = + ConsulConfigurationProvider(url, updateInterval, httpAdapter()) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 04e4927d..c70d128a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.ves.VesEventV5 +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import java.io.StringReader import java.time.Duration -import java.util.* +import java.util.Base64 import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -36,20 +38,39 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter) - : ConfigurationProvider { +internal class ConsulConfigurationProvider(private val url: String, + private val updateInterval: Duration, + private val http: HttpAdapter +) : ConfigurationProvider { private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) private var lastConfigurationHash: AtomicReference<Int> = AtomicReference() override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(Duration.ZERO, REFRESH_INTERVAL) - .flatMap { http.getResponse(url) } - .filter { body -> body.hashCode() != lastConfigurationHash.get() } - .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) } - .map { str -> getConfigurationJson(str) } - .map { json -> createCollectorConfiguration(json) } + Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) + + private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just( + CollectorConfiguration( + kafkaBootstrapServers = "kafka:9092", + routing = routing { + defineRoute { + fromDomain(HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build()) + ).doOnNext { logger.info("Applied default configuration") } + + private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval) + .flatMap { http.get(url) } + .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " + + "Shutting down..") } + .filter { it.hashCode() != lastConfigurationHash.get() } + .doOnNext { lastConfigurationHash.set(it.hashCode()) } + .map { getConfigurationJson(it) } + .map { createCollectorConfiguration(it) } + private fun getConfigurationJson(str: String): JsonObject { val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) @@ -60,23 +81,21 @@ internal class ConsulConfigurationProvider(private val url: String, private val } private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - - val routing = configuration.getJsonObject("routing") + val routing = configuration.getJsonArray("routing") return CollectorConfiguration( kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"), routing = org.onap.dcae.collectors.veshv.model.routing { - defineRoute { - fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain"))) - toTopic(routing.getString("toTopic")) - withFixedPartitioning() + for (route in routing) { + val routeObj = route.asJsonObject() + defineRoute { + fromDomain(forNumber(routeObj.getInt("fromDomain"))) + toTopic(routeObj.getString("toTopic")) + withFixedPartitioning() + } } }.build() ) } - - companion object { - private const val REFRESH_INTERVAL_MINUTES: Long = 5 - private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES) - } } + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 236e3cb7..a41cd09f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -34,15 +34,12 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun getResponse(url: String): Mono<String> = + open fun get(url: String): Mono<String> = httpClient.get(url) - .onErrorResume { e -> unableToGetResource(e, url) } - .flatMap { res -> res.receiveContent().toMono() } - .map { content -> content.content().toString(Charset.defaultCharset()) } - - - private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> { - logger.info("Failed to get resource on path: $url\n${e.localizedMessage}") - return Mono.empty() - } + .doOnError { + logger.error("Failed to get resource on path: $url (${it.localizedMessage})") + logger.debug("Nested exception:", it) + } + .flatMap { it.receiveContent().toMono() } + .map { it.content().toString(Charset.defaultCharset()) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 580d36c5..056e0557 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -31,10 +31,12 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder(private val decoder: WireFrameDecoder, + alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() - fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf) + fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter + .createFlux(decoder, streamBuffer, byteBuf) .doOnSubscribe { logIncomingMessage(byteBuf) } .doOnNext(this::logDecodedWireMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 67a7d6f2..025c59f6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,6 +29,7 @@ import java.time.Duration data class ServerConfiguration( val port: Int, val configurationUrl: String, + val configurationUpdateInterval: Duration, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index b2da430d..dd190848 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -20,12 +20,16 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono +import reactor.ipc.netty.http.client.HttpClient +import reactor.test.StepVerifier +import java.time.Duration import java.util.* import kotlin.test.assertEquals @@ -35,20 +39,63 @@ import kotlin.test.assertEquals */ internal object ConsulConfigurationProviderTest : Spek({ + val updateInterval = Duration.ofMillis(1) + val httpAdapterMock: HttpAdapter = mock() + given("valid resource url") { - val testUrl = "http://valid-url/" - val httpAdapterMock: HttpAdapter = mock() - val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock) - whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse())) + val validUrl = "http://valid-url/" + val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock) + + whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse())) + + it("should use default configuration at the beginning, " + + "then apply received configuration") { + + StepVerifier.create(consulConfigProvider().take(2)) + .consumeNextWith { + + assertEquals("kafka:9092", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HVRANMEAS, route1.domain) + assertEquals("ves_hvRanMeas", route1.targetTopic) + } + .consumeNextWith { + + assertEquals("kafka:9093", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HEARTBEAT, route1.domain) + assertEquals("test-topic-1", route1.targetTopic) + + val route2 = it.routing.routes[1] + assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain) + assertEquals("test-topic-2", route2.targetTopic) + }.verifyComplete() + } + } + given("invalid resource url") { + + val invalidUrl = "http://invalid-url/" + val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock) + + whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception"))) + + it("should use default configuration at the beginning, then should interrupt the flux") { - it("should create valid collector configuration") { - val response = consulConfigProvider().blockFirst() - assertEquals("val1", response.kafkaBootstrapServers) - val route = response.routing.routes[0] - assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain) - assertEquals("val3", route.targetTopic) + StepVerifier.create(consulConfigProvider()) + .consumeNextWith { + + + assertEquals("kafka:9092", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HVRANMEAS, route1.domain) + assertEquals("ves_hvRanMeas", route1.targetTopic) + } + .verifyErrorMessage("Test exception") } } }) @@ -56,11 +103,17 @@ internal object ConsulConfigurationProviderTest : Spek({ fun constructConsulResponse(): String { val config = """{ - "kafkaBootstrapServers": "val1", - "routing": { - "fromDomain": 2, - "toTopic": "val3" - } + "kafkaBootstrapServers": "kafka:9093", + "routing": [ + { + "fromDomain": 1, + "toTopic": "test-topic-1" + }, + { + "fromDomain": 2, + "toTopic": "test-topic-2" + } + ] }""" val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 66288453..79eda995 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -30,6 +30,7 @@ import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.ipc.netty.http.client.HttpClient import reactor.ipc.netty.http.client.HttpClientResponse +import reactor.test.StepVerifier import java.nio.charset.Charset import kotlin.test.assertEquals @@ -43,13 +44,15 @@ internal object HttpAdapterTest : Spek({ val httpClientMock: HttpClient = mock() val httpAdapter = HttpAdapter(httpClientMock) - val testUrl = "http://valid-url/" + val validUrl = "http://valid-url/" val responseContent = """{"key1": "value1", "key2": "value2"}""" val httpResponse = createHttpResponseMock(responseContent) - whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse)) + whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) it("should return response string") { - assertEquals(responseContent, httpAdapter.getResponse(testUrl).block()) + StepVerifier + .create(httpAdapter.get(validUrl)) + .expectNext(responseContent) } } @@ -57,12 +60,14 @@ internal object HttpAdapterTest : Spek({ val httpClientMock: HttpClient = mock() val httpAdapter = HttpAdapter(httpClientMock) - val testUrl = "http://invalid-url/" - whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception"))) + val invalidUrl = "http://invalid-url/" + val exceptionMessage = "Test exception" + whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) - - it("should return null response") { - assertEquals(null, httpAdapter.getResponse(testUrl).block()) + it("should interrupt the flux") { + StepVerifier + .create(httpAdapter.get(invalidUrl)) + .verifyErrorMessage(exceptionMessage) } } }) diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile index 1367ff1c..cab61dc3 100644 --- a/hv-collector-main/Dockerfile +++ b/hv-collector-main/Dockerfile @@ -9,7 +9,7 @@ EXPOSE 6061 WORKDIR /opt/ves-hv-collector ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] -CMD ["--listen-port", "6061"] +CMD ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ COPY target/hv-collector-main-*.jar ./ diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt index f3e97be2..63de270e 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -24,17 +24,21 @@ import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.UPDATE_INTERVAL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE + import java.time.Duration internal object DefaultValues { const val PORT = 6061 + const val UPDATE_INTERVAL = 300L const val CONFIG_URL = "" const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key" const val CERT_FILE = "/etc/ves-hv/server.crt" @@ -45,6 +49,7 @@ internal object DefaultValues { internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { override val cmdLineOptionsList = listOf( LISTEN_PORT, + UPDATE_INTERVAL, CONSUL_CONFIG_URL, PRIVATE_KEY_FILE, CERT_FILE, @@ -56,12 +61,14 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration { val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT) val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL) + val updateInterval = cmdLine.longValue(UPDATE_INTERVAL, DefaultValues.UPDATE_INTERVAL) val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) val dummyMode = cmdLine.hasOption(DUMMY_MODE) val security = createSecurityConfiguration(cmdLine) return ServerConfiguration( port = port, configurationUrl = configUrl, + configurationUpdateInterval = Duration.ofSeconds(updateInterval), securityConfiguration = security, idleTimeout = Duration.ofSeconds(idleTimeoutSec), dummyMode = dummyMode) diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 074a75e4..d1c3b4a7 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -20,17 +20,13 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.flatMap -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain private val logger = Logger("org.onap.dcae.collectors.veshv.main") private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" @@ -55,7 +51,8 @@ fun main(args: Array<String>) { private fun createServer(config: ServerConfiguration): Server { val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( - resolveConfigurationProvider(config), + AdapterFactory.consulConfigurationProvider( + config.configurationUrl, config.configurationUpdateInterval), sink, MicrometerMetrics() ).createVesHvCollectorProvider() @@ -63,23 +60,3 @@ private fun createServer(config: ServerConfiguration): Server { return ServerFactory.createNettyTcpServer(config, collectorProvider) } -private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider { - - if (serverConfiguration.configurationUrl.isEmpty()) { - logger.info("Configuration url not specified - using default config") - val sampleConfig = CollectorConfiguration( - kafkaBootstrapServers = "kafka:9092", - routing = routing { - defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic("ves_hvRanMeas") - withFixedPartitioning() - } - }.build() - ) - return AdapterFactory.staticConfigurationProvider(sampleConfig) - } - - logger.info("Using configuration url: ${serverConfiguration.configurationUrl}") - return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl) -} diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt index 4c2425bc..a14801da 100644 --- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt @@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.model.ServerConfiguration import java.nio.file.Paths +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,6 +39,8 @@ import java.nio.file.Paths object ArgBasedServerConfigurationTest : Spek({ lateinit var cut: ArgBasedServerConfiguration val configurationUrl = "http://test-address/test" + val listenPort = "6969" + val updateInterval = "10" val pk = Paths.get("/", "etc", "ves", "pk.pem") val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt") val trustCert = Paths.get("/", "etc", "ves", "trusted.crt") @@ -59,8 +62,9 @@ object ArgBasedServerConfigurationTest : Spek({ lateinit var result: ServerConfiguration beforeEachTest { - result = parse("--listen-port", "6969", + result = parse("--listen-port", listenPort, "--config-url", configurationUrl, + "--update-interval", updateInterval, "--private-key-file", pk.toFile().absolutePath, "--cert-file", cert.toFile().absolutePath, "--trust-cert-file", trustCert.toFile().absolutePath) @@ -70,6 +74,10 @@ object ArgBasedServerConfigurationTest : Spek({ assertThat(result.port).isEqualTo(6969) } + it("should set update interval") { + assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(10)) + } + it("should set proper config url") { assertThat(result.configurationUrl).isEqualTo(configurationUrl) } @@ -112,6 +120,10 @@ object ArgBasedServerConfigurationTest : Spek({ assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL) } + it("should set default update interval") { + assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(DefaultValues.UPDATE_INTERVAL)) + } + on("security config") { val securityConfiguration = result.securityConfiguration diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt index 942ca31f..b20f2aab 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt @@ -35,6 +35,12 @@ enum class CommandLineOption(val option: Option) { .desc("URL of ves configuration on consul") .build() ), + UPDATE_INTERVAL(Option.builder("I") + .longOpt("update-interval") + .hasArg() + .desc("Consul configuration update interval in seconds") + .build() + ), VES_HV_PORT(Option.builder("p") .longOpt("ves-port") .required() |