From a2d18b375631d010432089ed18db327c9e4f26bf Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Fri, 20 Jul 2018 16:37:02 +0200 Subject: Fix consul request timeout issue Fix timeout issue when using consul blocking query calls by switching to standard requests peformed in given interval Closes ONAP-628 Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58 Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-601 --- .../veshv/impl/adapters/AdapterFactory.kt | 10 +++++-- .../impl/adapters/ConsulConfigurationProvider.kt | 33 +++++++++------------- .../veshv/model/ConfigurationProviderParams.kt | 30 ++++++++++++++++++++ .../collectors/veshv/model/ServerConfiguration.kt | 3 +- 4 files changed, 53 insertions(+), 23 deletions(-) create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 2a8a3960..11a0e9bd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import reactor.ipc.netty.http.client.HttpClient import java.time.Duration @@ -33,8 +34,13 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider = - ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay) + fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConsulConfigurationProvider( + configurationProviderParams.configurationUrl, + httpAdapter(), + configurationProviderParams.firstRequestDelay, + configurationProviderParams.requestInterval + ) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 621c63f8..aca0e7e9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -41,10 +41,10 @@ import javax.json.JsonObject */ internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter, - private val firstRequestDelay: Duration + private val firstRequestDelay: Duration, + private val requestInterval: Duration ) : ConfigurationProvider { - private val lastModifyIndex: AtomicReference = AtomicReference(0) private val lastConfigurationHash: AtomicReference = AtomicReference(0) override fun invoke(): Flux = @@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String, }.build()) ).doOnNext { logger.info("Applied default configuration") } - private fun createConsulFlux(): Flux = - http.get(url, mapOf(Pair("index", lastModifyIndex.get()))) - .doOnError { - logger.error("Encountered an error " + - "when trying to acquire configuration from consul. Shutting down..") - } - .map(::parseJsonResponse) - .doOnNext(::updateModifyIndex) - .map(::extractEncodedConfiguration) - .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) - .map(::createCollectorConfiguration) - .repeat() - .delaySubscription(firstRequestDelay) + private fun createConsulFlux(): Flux = Flux + .interval(firstRequestDelay, requestInterval) + .flatMap { http.get(url) } + .doOnError { + logger.error("Encountered an error " + + "when trying to acquire configuration from consul. Shutting down..") + } + .map(::parseJsonResponse) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() - private fun updateModifyIndex(response: JsonObject) = - lastModifyIndex.set(response.getInt("ModifyIndex")) - private fun extractEncodedConfiguration(response: JsonObject): String = response.getString("Value") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt new file mode 100644 index 00000000..9de34498 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import java.time.Duration + +/** + * @author Jakub Dudycz + * @since July 2018 + */ +data class ConfigurationProviderParams(val configurationUrl: String, + val firstRequestDelay: Duration, + val requestInterval: Duration) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index a486996e..93ad719d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -28,8 +28,7 @@ import java.time.Duration */ data class ServerConfiguration( val port: Int, - val configurationUrl: String, - val firstRequestDelay: Duration, + val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) -- cgit 1.2.3-korg