aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt7
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt12
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt61
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt83
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt21
-rw-r--r--hv-collector-main/Dockerfile2
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt15
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt27
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt14
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt6
14 files changed, 184 insertions, 97 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 65951edc..d98971ba 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -20,6 +20,14 @@ services:
depends_on:
- zookeeper
+ consul:
+ image: progrium/consul
+ ports:
+ - "8500:8500"
+ environment:
+ - CONSUL_BIND_INTERFACE=eth0
+ command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
+
ves-hv-collector:
image: onap/ves-hv-collector
# build:
@@ -29,6 +37,7 @@ services:
- "6061:6061/tcp"
depends_on:
- kafka
+ - consul
volumes:
- ./ssl/:/etc/ves-hv/
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 8785180b..06047fd4 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -49,7 +49,9 @@ class CollectorFactory(val configuration: ConfigurationProvider,
}
private fun createVesHvCollector(): Flux<Collector> =
- configuration().map(this::createVesHvCollector)
+ configuration()
+ .doOnError { System.exit(ERROR_CODE) }
+ .map(this::createVesHvCollector)
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
@@ -61,5 +63,8 @@ class CollectorFactory(val configuration: ConfigurationProvider,
metrics = metrics)
}
+ companion object {
+ const val ERROR_CODE = 3
+ }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 358be108..d9e7432d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import reactor.core.publisher.Flux
import reactor.ipc.netty.http.client.HttpClient
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -34,13 +33,8 @@ object AdapterFactory {
fun kafkaSink(): SinkProvider = KafkaSinkProvider()
fun loggingSink(): SinkProvider = LoggingSinkProvider()
- fun staticConfigurationProvider(config: CollectorConfiguration) =
- object : ConfigurationProvider {
- override fun invoke() = Flux.just(config)
- }
-
- fun consulConfigurationProvider(url: String): ConfigurationProvider =
- ConsulConfigurationProvider(url, httpAdapter())
+ fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider =
+ ConsulConfigurationProvider(url, updateInterval, httpAdapter())
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 04e4927d..c70d128a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.io.StringReader
import java.time.Duration
-import java.util.*
+import java.util.Base64
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
@@ -36,20 +38,39 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
- : ConfigurationProvider {
+internal class ConsulConfigurationProvider(private val url: String,
+ private val updateInterval: Duration,
+ private val http: HttpAdapter
+) : ConfigurationProvider {
private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
- .flatMap { http.getResponse(url) }
- .filter { body -> body.hashCode() != lastConfigurationHash.get() }
- .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
- .map { str -> getConfigurationJson(str) }
- .map { json -> createCollectorConfiguration(json) }
+ Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
+
+ private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just(
+ CollectorConfiguration(
+ kafkaBootstrapServers = "kafka:9092",
+ routing = routing {
+ defineRoute {
+ fromDomain(HVRANMEAS)
+ toTopic("ves_hvRanMeas")
+ withFixedPartitioning()
+ }
+ }.build())
+ ).doOnNext { logger.info("Applied default configuration") }
+
+ private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval)
+ .flatMap { http.get(url) }
+ .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " +
+ "Shutting down..") }
+ .filter { it.hashCode() != lastConfigurationHash.get() }
+ .doOnNext { lastConfigurationHash.set(it.hashCode()) }
+ .map { getConfigurationJson(it) }
+ .map { createCollectorConfiguration(it) }
+
private fun getConfigurationJson(str: String): JsonObject {
val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
@@ -60,23 +81,21 @@ internal class ConsulConfigurationProvider(private val url: String, private val
}
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
-
- val routing = configuration.getJsonObject("routing")
+ val routing = configuration.getJsonArray("routing")
return CollectorConfiguration(
kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
routing = org.onap.dcae.collectors.veshv.model.routing {
- defineRoute {
- fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
- toTopic(routing.getString("toTopic"))
- withFixedPartitioning()
+ for (route in routing) {
+ val routeObj = route.asJsonObject()
+ defineRoute {
+ fromDomain(forNumber(routeObj.getInt("fromDomain")))
+ toTopic(routeObj.getString("toTopic"))
+ withFixedPartitioning()
+ }
}
}.build()
)
}
-
- companion object {
- private const val REFRESH_INTERVAL_MINUTES: Long = 5
- private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
- }
}
+
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 236e3cb7..a41cd09f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -34,15 +34,12 @@ open class HttpAdapter(private val httpClient: HttpClient) {
private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
- open fun getResponse(url: String): Mono<String> =
+ open fun get(url: String): Mono<String> =
httpClient.get(url)
- .onErrorResume { e -> unableToGetResource(e, url) }
- .flatMap { res -> res.receiveContent().toMono() }
- .map { content -> content.content().toString(Charset.defaultCharset()) }
-
-
- private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> {
- logger.info("Failed to get resource on path: $url\n${e.localizedMessage}")
- return Mono.empty()
- }
+ .doOnError {
+ logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
+ logger.debug("Nested exception:", it)
+ }
+ .flatMap { it.receiveContent().toMono() }
+ .map { it.content().toString(Charset.defaultCharset()) }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 580d36c5..056e0557 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -31,10 +31,12 @@ import reactor.core.publisher.Flux
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(private val decoder: WireFrameDecoder,
+ alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf)
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
+ .createFlux(decoder, streamBuffer, byteBuf)
.doOnSubscribe { logIncomingMessage(byteBuf) }
.doOnNext(this::logDecodedWireMessage)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 67a7d6f2..025c59f6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@ import java.time.Duration
data class ServerConfiguration(
val port: Int,
val configurationUrl: String,
+ val configurationUpdateInterval: Duration,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index b2da430d..dd190848 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -20,12 +20,16 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.test.StepVerifier
+import java.time.Duration
import java.util.*
import kotlin.test.assertEquals
@@ -35,20 +39,63 @@ import kotlin.test.assertEquals
*/
internal object ConsulConfigurationProviderTest : Spek({
+ val updateInterval = Duration.ofMillis(1)
+ val httpAdapterMock: HttpAdapter = mock()
+
given("valid resource url") {
- val testUrl = "http://valid-url/"
- val httpAdapterMock: HttpAdapter = mock()
- val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock)
- whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse()))
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock)
+
+ whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse()))
+
+ it("should use default configuration at the beginning, " +
+ "then apply received configuration") {
+
+ StepVerifier.create(consulConfigProvider().take(2))
+ .consumeNextWith {
+
+ assertEquals("kafka:9092", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertEquals(Domain.HVRANMEAS, route1.domain)
+ assertEquals("ves_hvRanMeas", route1.targetTopic)
+ }
+ .consumeNextWith {
+
+ assertEquals("kafka:9093", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertEquals(Domain.HEARTBEAT, route1.domain)
+ assertEquals("test-topic-1", route1.targetTopic)
+
+ val route2 = it.routing.routes[1]
+ assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain)
+ assertEquals("test-topic-2", route2.targetTopic)
+ }.verifyComplete()
+ }
+ }
+ given("invalid resource url") {
+
+ val invalidUrl = "http://invalid-url/"
+ val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock)
+
+ whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should use default configuration at the beginning, then should interrupt the flux") {
- it("should create valid collector configuration") {
- val response = consulConfigProvider().blockFirst()
- assertEquals("val1", response.kafkaBootstrapServers)
- val route = response.routing.routes[0]
- assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain)
- assertEquals("val3", route.targetTopic)
+ StepVerifier.create(consulConfigProvider())
+ .consumeNextWith {
+
+
+ assertEquals("kafka:9092", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertEquals(Domain.HVRANMEAS, route1.domain)
+ assertEquals("ves_hvRanMeas", route1.targetTopic)
+ }
+ .verifyErrorMessage("Test exception")
}
}
})
@@ -56,11 +103,17 @@ internal object ConsulConfigurationProviderTest : Spek({
fun constructConsulResponse(): String {
val config = """{
- "kafkaBootstrapServers": "val1",
- "routing": {
- "fromDomain": 2,
- "toTopic": "val3"
- }
+ "kafkaBootstrapServers": "kafka:9093",
+ "routing": [
+ {
+ "fromDomain": 1,
+ "toTopic": "test-topic-1"
+ },
+ {
+ "fromDomain": 2,
+ "toTopic": "test-topic-2"
+ }
+ ]
}"""
val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
index 66288453..79eda995 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -30,6 +30,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.ipc.netty.http.client.HttpClient
import reactor.ipc.netty.http.client.HttpClientResponse
+import reactor.test.StepVerifier
import java.nio.charset.Charset
import kotlin.test.assertEquals
@@ -43,13 +44,15 @@ internal object HttpAdapterTest : Spek({
val httpClientMock: HttpClient = mock()
val httpAdapter = HttpAdapter(httpClientMock)
- val testUrl = "http://valid-url/"
+ val validUrl = "http://valid-url/"
val responseContent = """{"key1": "value1", "key2": "value2"}"""
val httpResponse = createHttpResponseMock(responseContent)
- whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse))
+ whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
it("should return response string") {
- assertEquals(responseContent, httpAdapter.getResponse(testUrl).block())
+ StepVerifier
+ .create(httpAdapter.get(validUrl))
+ .expectNext(responseContent)
}
}
@@ -57,12 +60,14 @@ internal object HttpAdapterTest : Spek({
val httpClientMock: HttpClient = mock()
val httpAdapter = HttpAdapter(httpClientMock)
- val testUrl = "http://invalid-url/"
- whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception")))
+ val invalidUrl = "http://invalid-url/"
+ val exceptionMessage = "Test exception"
+ whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
-
- it("should return null response") {
- assertEquals(null, httpAdapter.getResponse(testUrl).block())
+ it("should interrupt the flux") {
+ StepVerifier
+ .create(httpAdapter.get(invalidUrl))
+ .verifyErrorMessage(exceptionMessage)
}
}
})
diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile
index 1367ff1c..cab61dc3 100644
--- a/hv-collector-main/Dockerfile
+++ b/hv-collector-main/Dockerfile
@@ -9,7 +9,7 @@ EXPOSE 6061
WORKDIR /opt/ves-hv-collector
ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
-CMD ["--listen-port", "6061"]
+CMD ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
COPY target/hv-collector-main-*.jar ./
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
index f3e97be2..63de270e 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -24,17 +24,21 @@ import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.UPDATE_INTERVAL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+
import java.time.Duration
internal object DefaultValues {
const val PORT = 6061
+ const val UPDATE_INTERVAL = 300L
const val CONFIG_URL = ""
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -45,6 +49,7 @@ internal object DefaultValues {
internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
LISTEN_PORT,
+ UPDATE_INTERVAL,
CONSUL_CONFIG_URL,
PRIVATE_KEY_FILE,
CERT_FILE,
@@ -56,12 +61,14 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+ val updateInterval = cmdLine.longValue(UPDATE_INTERVAL, DefaultValues.UPDATE_INTERVAL)
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
return ServerConfiguration(
port = port,
configurationUrl = configUrl,
+ configurationUpdateInterval = Duration.ofSeconds(updateInterval),
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
dummyMode = dummyMode)
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 074a75e4..d1c3b4a7 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -20,17 +20,13 @@
package org.onap.dcae.collectors.veshv.main
import arrow.core.flatMap
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
private val logger = Logger("org.onap.dcae.collectors.veshv.main")
private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
@@ -55,7 +51,8 @@ fun main(args: Array<String>) {
private fun createServer(config: ServerConfiguration): Server {
val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
- resolveConfigurationProvider(config),
+ AdapterFactory.consulConfigurationProvider(
+ config.configurationUrl, config.configurationUpdateInterval),
sink,
MicrometerMetrics()
).createVesHvCollectorProvider()
@@ -63,23 +60,3 @@ private fun createServer(config: ServerConfiguration): Server {
return ServerFactory.createNettyTcpServer(config, collectorProvider)
}
-private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider {
-
- if (serverConfiguration.configurationUrl.isEmpty()) {
- logger.info("Configuration url not specified - using default config")
- val sampleConfig = CollectorConfiguration(
- kafkaBootstrapServers = "kafka:9092",
- routing = routing {
- defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic("ves_hvRanMeas")
- withFixedPartitioning()
- }
- }.build()
- )
- return AdapterFactory.staticConfigurationProvider(sampleConfig)
- }
-
- logger.info("Using configuration url: ${serverConfiguration.configurationUrl}")
- return AdapterFactory.consulConfigurationProvider(serverConfiguration.configurationUrl)
-}
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
index 4c2425bc..a14801da 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import java.nio.file.Paths
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,6 +39,8 @@ import java.nio.file.Paths
object ArgBasedServerConfigurationTest : Spek({
lateinit var cut: ArgBasedServerConfiguration
val configurationUrl = "http://test-address/test"
+ val listenPort = "6969"
+ val updateInterval = "10"
val pk = Paths.get("/", "etc", "ves", "pk.pem")
val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
val trustCert = Paths.get("/", "etc", "ves", "trusted.crt")
@@ -59,8 +62,9 @@ object ArgBasedServerConfigurationTest : Spek({
lateinit var result: ServerConfiguration
beforeEachTest {
- result = parse("--listen-port", "6969",
+ result = parse("--listen-port", listenPort,
"--config-url", configurationUrl,
+ "--update-interval", updateInterval,
"--private-key-file", pk.toFile().absolutePath,
"--cert-file", cert.toFile().absolutePath,
"--trust-cert-file", trustCert.toFile().absolutePath)
@@ -70,6 +74,10 @@ object ArgBasedServerConfigurationTest : Spek({
assertThat(result.port).isEqualTo(6969)
}
+ it("should set update interval") {
+ assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(10))
+ }
+
it("should set proper config url") {
assertThat(result.configurationUrl).isEqualTo(configurationUrl)
}
@@ -112,6 +120,10 @@ object ArgBasedServerConfigurationTest : Spek({
assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
}
+ it("should set default update interval") {
+ assertThat(result.configurationUpdateInterval).isEqualTo(Duration.ofSeconds(DefaultValues.UPDATE_INTERVAL))
+ }
+
on("security config") {
val securityConfiguration = result.securityConfiguration
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 942ca31f..b20f2aab 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -35,6 +35,12 @@ enum class CommandLineOption(val option: Option) {
.desc("URL of ves configuration on consul")
.build()
),
+ UPDATE_INTERVAL(Option.builder("I")
+ .longOpt("update-interval")
+ .hasArg()
+ .desc("Consul configuration update interval in seconds")
+ .build()
+ ),
VES_HV_PORT(Option.builder("p")
.longOpt("ves-port")
.required()