aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2018-07-20 16:37:02 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-03 08:31:09 +0200
commita2d18b375631d010432089ed18db327c9e4f26bf (patch)
treead67ef481839ec7c81fb03daec7990faf715cf20 /hv-collector-core/src/main/kotlin
parentf4a58fbdbcaaba92a4daae0e2807536c3da4c857 (diff)
Fix consul request timeout issue
Fix timeout issue when using consul blocking query calls by switching to standard requests peformed in given interval Closes ONAP-628 Change-Id: Ifaf7ddfa27045015a7a90c178e0d6d38955c0c58 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt10
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt33
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt3
4 files changed, 53 insertions, 23 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 2a8a3960..11a0e9bd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import reactor.ipc.netty.http.client.HttpClient
import java.time.Duration
@@ -33,8 +34,13 @@ object AdapterFactory {
fun kafkaSink(): SinkProvider = KafkaSinkProvider()
fun loggingSink(): SinkProvider = LoggingSinkProvider()
- fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider =
- ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay)
+ fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConsulConfigurationProvider(
+ configurationProviderParams.configurationUrl,
+ httpAdapter(),
+ configurationProviderParams.firstRequestDelay,
+ configurationProviderParams.requestInterval
+ )
fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 621c63f8..aca0e7e9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -41,10 +41,10 @@ import javax.json.JsonObject
*/
internal class ConsulConfigurationProvider(private val url: String,
private val http: HttpAdapter,
- private val firstRequestDelay: Duration
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration
) : ConfigurationProvider {
- private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
override fun invoke(): Flux<CollectorConfiguration> =
@@ -62,27 +62,22 @@ internal class ConsulConfigurationProvider(private val url: String,
}.build())
).doOnNext { logger.info("Applied default configuration") }
- private fun createConsulFlux(): Flux<CollectorConfiguration> =
- http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
- .doOnError {
- logger.error("Encountered an error " +
- "when trying to acquire configuration from consul. Shutting down..")
- }
- .map(::parseJsonResponse)
- .doOnNext(::updateModifyIndex)
- .map(::extractEncodedConfiguration)
- .flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
- .map(::createCollectorConfiguration)
- .repeat()
- .delaySubscription(firstRequestDelay)
+ private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
+ .interval(firstRequestDelay, requestInterval)
+ .flatMap { http.get(url) }
+ .doOnError {
+ logger.error("Encountered an error " +
+ "when trying to acquire configuration from consul. Shutting down..")
+ }
+ .map(::parseJsonResponse)
+ .map(::extractEncodedConfiguration)
+ .flatMap(::filterDifferentValues)
+ .map(::decodeConfiguration)
+ .map(::createCollectorConfiguration)
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
- private fun updateModifyIndex(response: JsonObject) =
- lastModifyIndex.set(response.getInt("ModifyIndex"))
-
private fun extractEncodedConfiguration(response: JsonObject): String =
response.getString("Value")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
new file mode 100644
index 00000000..9de34498
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+data class ConfigurationProviderParams(val configurationUrl: String,
+ val firstRequestDelay: Duration,
+ val requestInterval: Duration)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index a486996e..93ad719d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -28,8 +28,7 @@ import java.time.Duration
*/
data class ServerConfiguration(
val port: Int,
- val configurationUrl: String,
- val firstRequestDelay: Duration,
+ val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val dummyMode: Boolean = false)