summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt10
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt33
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt3
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt5
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt21
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt4
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt31
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt8
-rw-r--r--pom.xml2
10 files changed, 107 insertions, 40 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 2a8a3960..11a0e9bd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
import java.time.Duration
@@ -33,8 +34,13 @@ object AdapterFactory {
fun kafkaSink(): SinkProvider = KafkaSinkProvider()
fun loggingSink(): SinkProvider = LoggingSinkProvider()
- fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider =
- ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay)
+ fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConsulConfigurationProvider(
+ configurationProviderParams.configurationUrl,
+ httpAdapter(),
+ configurationProviderParams.firstRequestDelay,
+ configurationProviderParams.requestInterval
+ )
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 621c63f8..aca0e7e9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -41,10 +41,10 @@ import javax.json.JsonObject
*/
internal class ConsulConfigurationProvider(private val url: String,
private val http: HttpAdapter,
- private val firstRequestDelay: Duration
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration
) : ConfigurationProvider {
- private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
override fun invoke(): Flux<CollectorConfiguration> =
@@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String,
}.build())
).doOnNext { logger.info("Applied default configuration") }
- private fun createConsulFlux(): Flux<CollectorConfiguration> =
- http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
- .doOnError {
- logger.error("Encountered an error " +
- "when trying to acquire configuration from consul. Shutting down..")
- }
- .map(::parseJsonResponse)
- .doOnNext(::updateModifyIndex)
- .map(::extractEncodedConfiguration)
- .flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
- .map(::createCollectorConfiguration)
- .repeat()
- .delaySubscription(firstRequestDelay)
+ private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
+ .interval(firstRequestDelay, requestInterval)
+ .flatMap { http.get(url) }
+ .doOnError {
+ logger.error("Encountered an error " +
+ "when trying to acquire configuration from consul. Shutting down..")
+ }
+ .map(::parseJsonResponse)
+ .map(::extractEncodedConfiguration)
+ .flatMap(::filterDifferentValues)
+ .map(::decodeConfiguration)
+ .map(::createCollectorConfiguration)
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
- private fun updateModifyIndex(response: JsonObject) =
- lastModifyIndex.set(response.getInt("ModifyIndex"))
-
private fun extractEncodedConfiguration(response: JsonObject): String =
response.getString("Value")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
new file mode 100644
index 00000000..9de34498
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+data class ConfigurationProviderParams(val configurationUrl: String,
+ val firstRequestDelay: Duration,
+ val requestInterval: Duration)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index a486996e..93ad719d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -28,8 +28,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val port: Int,
- val configurationUrl: String,
- val firstRequestDelay: Duration,
+ val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 322ec4e8..808a6fcc 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -41,11 +41,12 @@ internal object ConsulConfigurationProviderTest : Spek({
val httpAdapterMock: HttpAdapter = mock()
val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
given("valid resource url") {
val validUrl = "http://valid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay)
+ val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
.thenReturn(Mono.just(constructConsulResponse()))
@@ -80,7 +81,7 @@ internal object ConsulConfigurationProviderTest : Spek({
given("invalid resource url") {
val invalidUrl = "http://invalid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay)
+ val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
.thenReturn(Mono.error(RuntimeException("Test exception")))
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
index 35ca09d8..a11fe3d4 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
@@ -30,6 +31,7 @@ import java.time.Duration
internal object DefaultValues {
const val PORT = 6061
const val CONSUL_FIRST_REQUEST_DELAY = 10L
+ const val CONSUL_REQUEST_INTERVAL = 5L
const val CONFIG_URL = ""
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -42,6 +44,7 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
LISTEN_PORT,
CONSUL_CONFIG_URL,
CONSUL_FIRST_REQUEST_DELAY,
+ CONSUL_REQUEST_INTERVAL,
SSL_DISABLE,
PRIVATE_KEY_FILE,
CERT_FILE,
@@ -52,20 +55,30 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
- val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine);
return ServerConfiguration(
port = port,
- configurationUrl = configUrl,
- firstRequestDelay = Duration.ofSeconds(firstRequestDelay),
+ configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
dummyMode = dummyMode)
}
+ private fun createConfigurationProviderParams(cmdLine: CommandLine): ConfigurationProviderParams {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+ val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
+ val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
+
+ return ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }
+
private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
val sslDisable = cmdLine.hasOption(SSL_DISABLE)
val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE)
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 8418cd78..aa1f67b1 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -54,7 +54,9 @@ fun main(args: Array<String>) =
private fun createServer(config: ServerConfiguration): Server {
val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationUrl, config.firstRequestDelay), sink, MicrometerMetrics()
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics()
).createVesHvCollectorProvider()
return ServerFactory.createNettyTcpServer(config, collectorProvider)
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
index 0498344c..6da605f4 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
@@ -39,6 +39,7 @@ object ArgBasedServerConfigurationTest : Spek({
lateinit var cut: ArgBasedServerConfiguration
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
+ val requestInterval = "5"
val listenPort = "6969"
val pk = Paths.get("/", "etc", "ves", "pk.pem")
val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
@@ -63,6 +64,7 @@ object ArgBasedServerConfigurationTest : Spek({
"--listen-port", listenPort,
"--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
"--private-key-file", pk.toFile().absolutePath,
"--cert-file", cert.toFile().absolutePath,
"--trust-cert-file", trustCert.toFile().absolutePath)
@@ -73,11 +75,18 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set proper first consul request delay") {
- assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+ assertThat(result.configurationProviderParams.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(10))
+ }
+
+ it("should set proper consul request interval") {
+ assertThat(result.configurationProviderParams.requestInterval)
+ .isEqualTo(Duration.ofSeconds(5))
}
it("should set proper config url") {
- assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(configurationUrl)
}
it("should set proper security configuration") {
@@ -85,8 +94,6 @@ object ArgBasedServerConfigurationTest : Spek({
SecurityConfiguration(sslDisable = true, privateKey = pk, cert = cert, trustedCert = trustCert)
)
}
-
-
}
given("some parameters are present in the short form") {
@@ -101,11 +108,13 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set proper first consul request delay") {
- assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+ assertThat(result.configurationProviderParams.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(10))
}
it("should set proper config url") {
- assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(configurationUrl)
}
}
@@ -121,14 +130,20 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set default config url") {
- assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(DefaultValues.CONFIG_URL)
}
it("should set default first consul request delay") {
- assertThat(result.firstRequestDelay)
+ assertThat(result.configurationProviderParams.firstRequestDelay)
.isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_FIRST_REQUEST_DELAY))
}
+ it("should set default consul request interval") {
+ assertThat(result.configurationProviderParams.requestInterval)
+ .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_REQUEST_INTERVAL))
+ }
+
on("security config") {
val securityConfiguration = result.securityConfiguration
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 27213c95..a654868e 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -41,6 +41,12 @@ enum class CommandLineOption(val option: Option) {
.desc("Delay of first request to consul in seconds")
.build()
),
+ CONSUL_REQUEST_INTERVAL(Option.builder("I")
+ .longOpt("request-interval")
+ .hasArg()
+ .desc("Interval of consul configuration requests in seconds")
+ .build()
+ ),
VES_HV_PORT(Option.builder("p")
.longOpt("ves-port")
.required()
@@ -105,7 +111,7 @@ enum class CommandLineOption(val option: Option) {
|connection might be closed.""".trimMargin())
.build()
),
- DUMMY_MODE(Option.builder("d")
+ DUMMY_MODE(Option.builder("u")
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
.build()
diff --git a/pom.xml b/pom.xml
index 45fa4968..ef531bfb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,8 +71,8 @@
<failIfMissingComponentTests>false</failIfMissingComponentTests>
<skipAnalysis>true</skipAnalysis>
- <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<!-- Docker -->
+ <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<onap.nexus.dockerregistry.daily>nexus3.onap.org:10003</onap.nexus.dockerregistry.daily>
<onap.nexus.dockerregistry.release>nexus3.onap.org:10002</onap.nexus.dockerregistry.release>
<docker-image.registry>${onap.nexus.dockerregistry.daily}</docker-image.registry>