summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-07-20 16:37:02 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-03 08:31:09 +0200
commita2d18b375631d010432089ed18db327c9e4f26bf (patch)
treead67ef481839ec7c81fb03daec7990faf715cf20
parentf4a58fbdbcaaba92a4daae0e2807536c3da4c857 (diff)
Fix consul request timeout issue
Fix timeout issue when using consul blocking query calls by switching to standard requests peformed in given interval Closes ONAP-628 Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt10
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt33
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt3
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt5
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt21
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt4
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt31
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt8
-rw-r--r--pom.xml2
10 files changed, 107 insertions, 40 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 2a8a3960..11a0e9bd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
import java.time.Duration
@@ -33,8 +34,13 @@ object AdapterFactory {
fun kafkaSink(): SinkProvider = KafkaSinkProvider()
fun loggingSink(): SinkProvider = LoggingSinkProvider()
- fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider =
- ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay)
+ fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConsulConfigurationProvider(
+ configurationProviderParams.configurationUrl,
+ httpAdapter(),
+ configurationProviderParams.firstRequestDelay,
+ configurationProviderParams.requestInterval
+ )
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 621c63f8..aca0e7e9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -41,10 +41,10 @@ import javax.json.JsonObject
*/
internal class ConsulConfigurationProvider(private val url: String,
private val http: HttpAdapter,
- private val firstRequestDelay: Duration
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration
) : ConfigurationProvider {
- private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
override fun invoke(): Flux<CollectorConfiguration> =
@@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String,
}.build())
).doOnNext { logger.info("Applied default configuration") }
- private fun createConsulFlux(): Flux<CollectorConfiguration> =
- http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
- .doOnError {
- logger.error("Encountered an error " +
- "when trying to acquire configuration from consul. Shutting down..")
- }
- .map(::parseJsonResponse)
- .doOnNext(::updateModifyIndex)
- .map(::extractEncodedConfiguration)
- .flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
- .map(::createCollectorConfiguration)
- .repeat()
- .delaySubscription(firstRequestDelay)
+ private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
+ .interval(firstRequestDelay, requestInterval)
+ .flatMap { http.get(url) }
+ .doOnError {
+ logger.error("Encountered an error " +
+ "when trying to acquire configuration from consul. Shutting down..")
+ }
+ .map(::parseJsonResponse)
+ .map(::extractEncodedConfiguration)
+ .flatMap(::filterDifferentValues)
+ .map(::decodeConfiguration)
+ .map(::createCollectorConfiguration)
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
- private fun updateModifyIndex(response: JsonObject) =
- lastModifyIndex.set(response.getInt("ModifyIndex"))
-
private fun extractEncodedConfiguration(response: JsonObject): String =
response.getString("Value")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
new file mode 100644
index 00000000..9de34498
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+data class ConfigurationProviderParams(val configurationUrl: String,
+ val firstRequestDelay: Duration,
+ val requestInterval: Duration)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index a486996e..93ad719d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -28,8 +28,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val port: Int,
- val configurationUrl: String,
- val firstRequestDelay: Duration,
+ val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 322ec4e8..808a6fcc 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -41,11 +41,12 @@ internal object ConsulConfigurationProviderTest : Spek({
val httpAdapterMock: HttpAdapter = mock()
val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
given("valid resource url") {
val validUrl = "http://valid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay)
+ val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
.thenReturn(Mono.just(constructConsulResponse()))
@@ -80,7 +81,7 @@ internal object ConsulConfigurationProviderTest : Spek({
given("invalid resource url") {
val invalidUrl = "http://invalid-url/"
- val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay)
+ val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
.thenReturn(Mono.error(RuntimeException("Test exception")))
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
index 35ca09d8..a11fe3d4 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.*
@@ -30,6 +31,7 @@ import java.time.Duration
internal object DefaultValues {
const val PORT = 6061
const val CONSUL_FIRST_REQUEST_DELAY = 10L
+ const val CONSUL_REQUEST_INTERVAL = 5L
const val CONFIG_URL = ""
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
const val CERT_FILE = "/etc/ves-hv/server.crt"
@@ -42,6 +44,7 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
LISTEN_PORT,
CONSUL_CONFIG_URL,
CONSUL_FIRST_REQUEST_DELAY,
+ CONSUL_REQUEST_INTERVAL,
SSL_DISABLE,
PRIVATE_KEY_FILE,
CERT_FILE,
@@ -52,20 +55,30 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
- val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
+ val configurationProviderParams = createConfigurationProviderParams(cmdLine);
return ServerConfiguration(
port = port,
- configurationUrl = configUrl,
- firstRequestDelay = Duration.ofSeconds(firstRequestDelay),
+ configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
dummyMode = dummyMode)
}
+ private fun createConfigurationProviderParams(cmdLine: CommandLine): ConfigurationProviderParams {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+ val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
+ val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
+
+ return ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }
+
private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
val sslDisable = cmdLine.hasOption(SSL_DISABLE)
val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE)
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 8418cd78..aa1f67b1 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -54,7 +54,9 @@ fun main(args: Array<String>) =
private fun createServer(config: ServerConfiguration): Server {
val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationUrl, config.firstRequestDelay), sink, MicrometerMetrics()
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics()
).createVesHvCollectorProvider()
return ServerFactory.createNettyTcpServer(config, collectorProvider)
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
index 0498344c..6da605f4 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
@@ -39,6 +39,7 @@ object ArgBasedServerConfigurationTest : Spek({
lateinit var cut: ArgBasedServerConfiguration
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
+ val requestInterval = "5"
val listenPort = "6969"
val pk = Paths.get("/", "etc", "ves", "pk.pem")
val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt")
@@ -63,6 +64,7 @@ object ArgBasedServerConfigurationTest : Spek({
"--listen-port", listenPort,
"--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
+ "--request-interval", requestInterval,
"--private-key-file", pk.toFile().absolutePath,
"--cert-file", cert.toFile().absolutePath,
"--trust-cert-file", trustCert.toFile().absolutePath)
@@ -73,11 +75,18 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set proper first consul request delay") {
- assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+ assertThat(result.configurationProviderParams.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(10))
+ }
+
+ it("should set proper consul request interval") {
+ assertThat(result.configurationProviderParams.requestInterval)
+ .isEqualTo(Duration.ofSeconds(5))
}
it("should set proper config url") {
- assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(configurationUrl)
}
it("should set proper security configuration") {
@@ -85,8 +94,6 @@ object ArgBasedServerConfigurationTest : Spek({
SecurityConfiguration(sslDisable = true, privateKey = pk, cert = cert, trustedCert = trustCert)
)
}
-
-
}
given("some parameters are present in the short form") {
@@ -101,11 +108,13 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set proper first consul request delay") {
- assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10))
+ assertThat(result.configurationProviderParams.firstRequestDelay)
+ .isEqualTo(Duration.ofSeconds(10))
}
it("should set proper config url") {
- assertThat(result.configurationUrl).isEqualTo(configurationUrl)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(configurationUrl)
}
}
@@ -121,14 +130,20 @@ object ArgBasedServerConfigurationTest : Spek({
}
it("should set default config url") {
- assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL)
+ assertThat(result.configurationProviderParams.configurationUrl)
+ .isEqualTo(DefaultValues.CONFIG_URL)
}
it("should set default first consul request delay") {
- assertThat(result.firstRequestDelay)
+ assertThat(result.configurationProviderParams.firstRequestDelay)
.isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_FIRST_REQUEST_DELAY))
}
+ it("should set default consul request interval") {
+ assertThat(result.configurationProviderParams.requestInterval)
+ .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_REQUEST_INTERVAL))
+ }
+
on("security config") {
val securityConfiguration = result.securityConfiguration
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 27213c95..a654868e 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -41,6 +41,12 @@ enum class CommandLineOption(val option: Option) {
.desc("Delay of first request to consul in seconds")
.build()
),
+ CONSUL_REQUEST_INTERVAL(Option.builder("I")
+ .longOpt("request-interval")
+ .hasArg()
+ .desc("Interval of consul configuration requests in seconds")
+ .build()
+ ),
VES_HV_PORT(Option.builder("p")
.longOpt("ves-port")
.required()
@@ -105,7 +111,7 @@ enum class CommandLineOption(val option: Option) {
|connection might be closed.""".trimMargin())
.build()
),
- DUMMY_MODE(Option.builder("d")
+ DUMMY_MODE(Option.builder("u")
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
.build()
diff --git a/pom.xml b/pom.xml
index 45fa4968..ef531bfb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,8 +71,8 @@
<failIfMissingComponentTests>false</failIfMissingComponentTests>
<skipAnalysis>true</skipAnalysis>
- <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<!-- Docker -->
+ <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<onap.nexus.dockerregistry.daily>nexus3.onap.org:10003</onap.nexus.dockerregistry.daily>
<onap.nexus.dockerregistry.release>nexus3.onap.org:10002</onap.nexus.dockerregistry.release>
<docker-image.registry>${onap.nexus.dockerregistry.daily}</docker-image.registry>