diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-07-20 16:37:02 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-03 08:31:09 +0200 |
commit | a2d18b375631d010432089ed18db327c9e4f26bf (patch) | |
tree | ad67ef481839ec7c81fb03daec7990faf715cf20 | |
parent | f4a58fbdbcaaba92a4daae0e2807536c3da4c857 (diff) |
Fix consul request timeout issue
Fix timeout issue when using consul blocking query calls
by switching to standard requests peformed in given interval
Closes ONAP-628
Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601
10 files changed, 107 insertions, 40 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 2a8a3960..11a0e9bd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient import java.time.Duration @@ -33,8 +34,13 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider = - ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay) + fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConsulConfigurationProvider( + configurationProviderParams.configurationUrl, + httpAdapter(), + configurationProviderParams.firstRequestDelay, + configurationProviderParams.requestInterval + ) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 621c63f8..aca0e7e9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -41,10 +41,10 @@ import javax.json.JsonObject */ internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter, - private val firstRequestDelay: Duration + private val firstRequestDelay: Duration, + private val requestInterval: Duration ) : ConfigurationProvider { - private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0) private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) override fun invoke(): Flux<CollectorConfiguration> = @@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String, }.build()) ).doOnNext { logger.info("Applied default configuration") } - private fun createConsulFlux(): Flux<CollectorConfiguration> = - http.get(url, mapOf(Pair("index", lastModifyIndex.get()))) - .doOnError { - logger.error("Encountered an error " + - "when trying to acquire configuration from consul. Shutting down..") - } - .map(::parseJsonResponse) - .doOnNext(::updateModifyIndex) - .map(::extractEncodedConfiguration) - .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) - .map(::createCollectorConfiguration) - .repeat() - .delaySubscription(firstRequestDelay) + private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux + .interval(firstRequestDelay, requestInterval) + .flatMap { http.get(url) } + .doOnError { + logger.error("Encountered an error " + + "when trying to acquire configuration from consul. Shutting down..") + } + .map(::parseJsonResponse) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() - private fun updateModifyIndex(response: JsonObject) = - lastModifyIndex.set(response.getInt("ModifyIndex")) - private fun extractEncodedConfiguration(response: JsonObject): String = response.getString("Value") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt new file mode 100644 index 00000000..9de34498 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import java.time.Duration + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +data class ConfigurationProviderParams(val configurationUrl: String, + val firstRequestDelay: Duration, + val requestInterval: Duration) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index a486996e..93ad719d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -28,8 +28,7 @@ import java.time.Duration */ data class ServerConfiguration( val port: Int, - val configurationUrl: String, - val firstRequestDelay: Duration, + val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 322ec4e8..808a6fcc 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -41,11 +41,12 @@ internal object ConsulConfigurationProviderTest : Spek({ val httpAdapterMock: HttpAdapter = mock() val firstRequestDelay = Duration.ofMillis(1) + val requestInterval = Duration.ofMillis(1) given("valid resource url") { val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay) + val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval) whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) .thenReturn(Mono.just(constructConsulResponse())) @@ -80,7 +81,7 @@ internal object ConsulConfigurationProviderTest : Spek({ given("invalid resource url") { val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay) + val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval) whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) .thenReturn(Mono.error(RuntimeException("Test exception"))) diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt index 35ca09d8..a11fe3d4 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.* @@ -30,6 +31,7 @@ import java.time.Duration internal object DefaultValues { const val PORT = 6061 const val CONSUL_FIRST_REQUEST_DELAY = 10L + const val CONSUL_REQUEST_INTERVAL = 5L const val CONFIG_URL = "" const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key" const val CERT_FILE = "/etc/ves-hv/server.crt" @@ -42,6 +44,7 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu LISTEN_PORT, CONSUL_CONFIG_URL, CONSUL_FIRST_REQUEST_DELAY, + CONSUL_REQUEST_INTERVAL, SSL_DISABLE, PRIVATE_KEY_FILE, CERT_FILE, @@ -52,20 +55,30 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration { val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT) - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL) - val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY) val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) val dummyMode = cmdLine.hasOption(DUMMY_MODE) val security = createSecurityConfiguration(cmdLine) + val configurationProviderParams = createConfigurationProviderParams(cmdLine); return ServerConfiguration( port = port, - configurationUrl = configUrl, - firstRequestDelay = Duration.ofSeconds(firstRequestDelay), + configurationProviderParams = configurationProviderParams, securityConfiguration = security, idleTimeout = Duration.ofSeconds(idleTimeoutSec), dummyMode = dummyMode) } + private fun createConfigurationProviderParams(cmdLine: CommandLine): ConfigurationProviderParams { + val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL) + val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY) + val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL) + + return ConfigurationProviderParams( + configUrl, + Duration.ofSeconds(firstRequestDelay), + Duration.ofSeconds(requestInterval) + ) + } + private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration { val sslDisable = cmdLine.hasOption(SSL_DISABLE) val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE) diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 8418cd78..aa1f67b1 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -54,7 +54,9 @@ fun main(args: Array<String>) = private fun createServer(config: ServerConfiguration): Server { val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationUrl, config.firstRequestDelay), sink, MicrometerMetrics() + AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + sink, + MicrometerMetrics() ).createVesHvCollectorProvider() return ServerFactory.createNettyTcpServer(config, collectorProvider) diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt index 0498344c..6da605f4 100644 --- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt @@ -39,6 +39,7 @@ object ArgBasedServerConfigurationTest : Spek({ lateinit var cut: ArgBasedServerConfiguration val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" + val requestInterval = "5" val listenPort = "6969" val pk = Paths.get("/", "etc", "ves", "pk.pem") val cert = Paths.get("/", "etc", "ssl", "certs", "ca-bundle.crt") @@ -63,6 +64,7 @@ object ArgBasedServerConfigurationTest : Spek({ "--listen-port", listenPort, "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, + "--request-interval", requestInterval, "--private-key-file", pk.toFile().absolutePath, "--cert-file", cert.toFile().absolutePath, "--trust-cert-file", trustCert.toFile().absolutePath) @@ -73,11 +75,18 @@ object ArgBasedServerConfigurationTest : Spek({ } it("should set proper first consul request delay") { - assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10)) + assertThat(result.configurationProviderParams.firstRequestDelay) + .isEqualTo(Duration.ofSeconds(10)) + } + + it("should set proper consul request interval") { + assertThat(result.configurationProviderParams.requestInterval) + .isEqualTo(Duration.ofSeconds(5)) } it("should set proper config url") { - assertThat(result.configurationUrl).isEqualTo(configurationUrl) + assertThat(result.configurationProviderParams.configurationUrl) + .isEqualTo(configurationUrl) } it("should set proper security configuration") { @@ -85,8 +94,6 @@ object ArgBasedServerConfigurationTest : Spek({ SecurityConfiguration(sslDisable = true, privateKey = pk, cert = cert, trustedCert = trustCert) ) } - - } given("some parameters are present in the short form") { @@ -101,11 +108,13 @@ object ArgBasedServerConfigurationTest : Spek({ } it("should set proper first consul request delay") { - assertThat(result.firstRequestDelay).isEqualTo(Duration.ofSeconds(10)) + assertThat(result.configurationProviderParams.firstRequestDelay) + .isEqualTo(Duration.ofSeconds(10)) } it("should set proper config url") { - assertThat(result.configurationUrl).isEqualTo(configurationUrl) + assertThat(result.configurationProviderParams.configurationUrl) + .isEqualTo(configurationUrl) } } @@ -121,14 +130,20 @@ object ArgBasedServerConfigurationTest : Spek({ } it("should set default config url") { - assertThat(result.configurationUrl).isEqualTo(DefaultValues.CONFIG_URL) + assertThat(result.configurationProviderParams.configurationUrl) + .isEqualTo(DefaultValues.CONFIG_URL) } it("should set default first consul request delay") { - assertThat(result.firstRequestDelay) + assertThat(result.configurationProviderParams.firstRequestDelay) .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_FIRST_REQUEST_DELAY)) } + it("should set default consul request interval") { + assertThat(result.configurationProviderParams.requestInterval) + .isEqualTo(Duration.ofSeconds(DefaultValues.CONSUL_REQUEST_INTERVAL)) + } + on("security config") { val securityConfiguration = result.securityConfiguration diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt index 27213c95..a654868e 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt @@ -41,6 +41,12 @@ enum class CommandLineOption(val option: Option) { .desc("Delay of first request to consul in seconds") .build() ), + CONSUL_REQUEST_INTERVAL(Option.builder("I") + .longOpt("request-interval") + .hasArg() + .desc("Interval of consul configuration requests in seconds") + .build() + ), VES_HV_PORT(Option.builder("p") .longOpt("ves-port") .required() @@ -105,7 +111,7 @@ enum class CommandLineOption(val option: Option) { |connection might be closed.""".trimMargin()) .build() ), - DUMMY_MODE(Option.builder("d") + DUMMY_MODE(Option.builder("u") .longOpt("dummy") .desc("If present will start in dummy mode (dummy external services)") .build() @@ -71,8 +71,8 @@ <failIfMissingComponentTests>false</failIfMissingComponentTests> <skipAnalysis>true</skipAnalysis> - <skipDocker>true</skipDocker> <!-- TODO: unskip docker --> <!-- Docker --> + <skipDocker>true</skipDocker> <!-- TODO: unskip docker --> <onap.nexus.dockerregistry.daily>nexus3.onap.org:10003</onap.nexus.dockerregistry.daily> <onap.nexus.dockerregistry.release>nexus3.onap.org:10002</onap.nexus.dockerregistry.release> <docker-image.registry>${onap.nexus.dockerregistry.daily}</docker-image.registry> |