From a788d58f813b71644059623877aca629ab49ab74 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Thu, 5 Jul 2018 14:35:43 +0200 Subject: Implement blocking consul calls Replaced interval based requesting for consul configuration with blocking query calls Closes ONAP-80 Change-Id: If70365bae9fde513d99b047209d085122a5df0dd Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-601 --- .../veshv/impl/adapters/AdapterFactory.kt | 4 +- .../impl/adapters/ConsulConfigurationProvider.kt | 77 +++++++++++++++------- .../collectors/veshv/impl/adapters/HttpAdapter.kt | 36 +++++++--- .../collectors/veshv/model/ServerConfiguration.kt | 2 +- 4 files changed, 84 insertions(+), 35 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index d9e7432d..2a8a3960 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -33,8 +33,8 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider = - ConsulConfigurationProvider(url, updateInterval, httpAdapter()) + fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider = + ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index c70d128a..727f025b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -26,9 +26,14 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.ipc.netty.http.client.HttpClientException +import reactor.retry.Retry +import reactor.retry.retryExponentialBackoff import java.io.StringReader import java.time.Duration -import java.util.Base64 +import java.util.* +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -39,18 +44,17 @@ import javax.json.JsonObject * @since May 2018 */ internal class ConsulConfigurationProvider(private val url: String, - private val updateInterval: Duration, - private val http: HttpAdapter + private val http: HttpAdapter, + private val firstRequestDelay: Duration ) : ConfigurationProvider { - - private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) - private var lastConfigurationHash: AtomicReference = AtomicReference() + private val lastModifyIndex: AtomicReference = AtomicReference(0) + private val lastConfigurationHash: AtomicReference = AtomicReference(0) override fun invoke(): Flux = Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) - private fun createDefaultConfigurationFlux(): Flux = Flux.just( + private fun createDefaultConfigurationFlux(): Mono = Mono.just( CollectorConfiguration( kafkaBootstrapServers = "kafka:9092", routing = routing { @@ -60,22 +64,45 @@ internal class ConsulConfigurationProvider(private val url: String, withFixedPartitioning() } }.build()) - ).doOnNext { logger.info("Applied default configuration") } - - private fun createConsulFlux(): Flux = Flux.interval(updateInterval) - .flatMap { http.get(url) } - .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " + - "Shutting down..") } - .filter { it.hashCode() != lastConfigurationHash.get() } - .doOnNext { lastConfigurationHash.set(it.hashCode()) } - .map { getConfigurationJson(it) } - .map { createCollectorConfiguration(it) } - - - private fun getConfigurationJson(str: String): JsonObject { - val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) - val decodedValue = String( - Base64.getDecoder().decode(response.getString("Value"))) + ).doOnNext { logger.info("Applied default configuration") }.delayElement(firstRequestDelay) + + private fun createConsulFlux(): Flux = + http.get(url, mapOf(Pair("index", lastModifyIndex.get()))) + .doOnError { + logger.error("Encountered an error " + + "when trying to acquire configuration from consul. Shutting down..") + } + .map(::parseJsonResponse) + .doOnNext(::updateModifyIndex) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) + .repeat() + + private fun parseJsonResponse(responseString: String): JsonObject = + Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() + + private fun updateModifyIndex(response: JsonObject) = + lastModifyIndex.set(response.getInt("ModifyIndex")) + + private fun extractEncodedConfiguration(response: JsonObject): String = + response.getString("Value") + + private fun filterDifferentValues(base64Value: String): Mono { + val newHash = hashOf(base64Value) + return if (newHash == lastConfigurationHash.get()) { + Mono.empty() + } else { + lastConfigurationHash.set(newHash) + Mono.just(base64Value) + } + } + + private fun hashOf(str: String) = str.hashCode() + + private fun decodeConfiguration(encodedConfiguration: String): JsonObject { + val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration)) logger.info("Obtained new configuration from consul:\n$decodedValue") return Json.createReader(StringReader(decodedValue)).readObject() } @@ -97,5 +124,9 @@ internal class ConsulConfigurationProvider(private val url: String, }.build() ) } + + companion object { + private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index a41cd09f..4503955f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.core.publisher.toMono import reactor.ipc.netty.http.client.HttpClient -import reactor.ipc.netty.http.client.HttpClientResponse import java.nio.charset.Charset /** @@ -34,12 +33,31 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String): Mono = - httpClient.get(url) - .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) - } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } + open fun get(url: String, queryParams: Map = emptyMap()): Mono = httpClient + .get(url + createQueryString(queryParams)) + .doOnError { + logger.error("Failed to get resource on path: $url (${it.localizedMessage})") + logger.debug("Nested exception:", it) + } + .flatMap { it.receiveContent().toMono() } + .map { it.content().toString(Charset.defaultCharset()) } + + + private fun createQueryString(params: Map): String { + if (params.isEmpty()) + return "" + + val builder = StringBuilder("?") + params.forEach { (key, value) -> + builder + .append(key) + .append("=") + .append(value) + .append("&") + + } + + return builder.removeSuffix("&").toString() + } + } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 025c59f6..a486996e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,7 +29,7 @@ import java.time.Duration data class ServerConfiguration( val port: Int, val configurationUrl: String, - val configurationUpdateInterval: Duration, + val firstRequestDelay: Duration, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) -- cgit 1.2.3-korg