diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2018-07-20 16:37:02 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-03 08:31:09 +0200 |
commit | a2d18b375631d010432089ed18db327c9e4f26bf (patch) | |
tree | ad67ef481839ec7c81fb03daec7990faf715cf20 /hv-collector-core | |
parent | f4a58fbdbcaaba92a4daae0e2807536c3da4c857 (diff) |
Fix consul request timeout issue
Fix timeout issue when using consul blocking query calls
by switching to standard requests peformed in given interval
Closes ONAP-628
Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
5 files changed, 56 insertions, 25 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 2a8a3960..11a0e9bd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient import java.time.Duration @@ -33,8 +34,13 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider = - ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay) + fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConsulConfigurationProvider( + configurationProviderParams.configurationUrl, + httpAdapter(), + configurationProviderParams.firstRequestDelay, + configurationProviderParams.requestInterval + ) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 621c63f8..aca0e7e9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -41,10 +41,10 @@ import javax.json.JsonObject */ internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter, - private val firstRequestDelay: Duration + private val firstRequestDelay: Duration, + private val requestInterval: Duration ) : ConfigurationProvider { - private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0) private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) override fun invoke(): Flux<CollectorConfiguration> = @@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String, }.build()) ).doOnNext { logger.info("Applied default configuration") } - private fun createConsulFlux(): Flux<CollectorConfiguration> = - http.get(url, mapOf(Pair("index", lastModifyIndex.get()))) - .doOnError { - logger.error("Encountered an error " + - "when trying to acquire configuration from consul. Shutting down..") - } - .map(::parseJsonResponse) - .doOnNext(::updateModifyIndex) - .map(::extractEncodedConfiguration) - .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) - .map(::createCollectorConfiguration) - .repeat() - .delaySubscription(firstRequestDelay) + private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux + .interval(firstRequestDelay, requestInterval) + .flatMap { http.get(url) } + .doOnError { + logger.error("Encountered an error " + + "when trying to acquire configuration from consul. Shutting down..") + } + .map(::parseJsonResponse) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() - private fun updateModifyIndex(response: JsonObject) = - lastModifyIndex.set(response.getInt("ModifyIndex")) - private fun extractEncodedConfiguration(response: JsonObject): String = response.getString("Value") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt new file mode 100644 index 00000000..9de34498 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import java.time.Duration + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since July 2018 + */ +data class ConfigurationProviderParams(val configurationUrl: String, + val firstRequestDelay: Duration, + val requestInterval: Duration) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index a486996e..93ad719d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -28,8 +28,7 @@ import java.time.Duration */ data class ServerConfiguration( val port: Int, - val configurationUrl: String, - val firstRequestDelay: Duration, + val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 322ec4e8..808a6fcc 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -41,11 +41,12 @@ internal object ConsulConfigurationProviderTest : Spek({ val httpAdapterMock: HttpAdapter = mock() val firstRequestDelay = Duration.ofMillis(1) + val requestInterval = Duration.ofMillis(1) given("valid resource url") { val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay) + val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval) whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) .thenReturn(Mono.just(constructConsulResponse())) @@ -80,7 +81,7 @@ internal object ConsulConfigurationProviderTest : Spek({ given("invalid resource url") { val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay) + val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval) whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) .thenReturn(Mono.error(RuntimeException("Test exception"))) |