summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2018-11-27 23:28:54 +0000
committerGerrit Code Review <gerrit@onap.org>2018-11-27 23:28:54 +0000
commit4f683d3e17025131a297baa67300da679bb85141 (patch)
tree6cb723d8c356d196cb8acf6818fd5053e7569ef4
parentf66b6052f2ba2c792278ebb38ef3990835c4c3ed (diff)
parente4dd72d3804b697f110a79ba5510c654f0765ec5 (diff)
Merge "Remove Consul configuration decoding"
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt39
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt20
2 files changed, 16 insertions, 43 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index af4bbaa1..ec7c60c0 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -31,7 +31,6 @@ import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
-import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
@@ -72,41 +71,31 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
override fun invoke(): Flux<CollectorConfiguration> =
Flux.interval(firstRequestDelay, requestInterval)
- .flatMap { askForConfig() }
- .map(::parseJsonResponse)
- .map(::extractEncodedConfiguration)
+ .concatMap { askForConfig() }
.flatMap(::filterDifferentValues)
- .map(::decodeConfiguration)
+ .map(::parseJsonResponse)
.map(::createCollectorConfiguration)
.retryWhen(retry)
private fun askForConfig(): Mono<String> = http.get(url)
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
-
- private fun extractEncodedConfiguration(response: JsonObject): String =
- response.getString("Value")
-
- private fun filterDifferentValues(base64Value: String): Mono<String> {
- val newHash = hashOf(base64Value)
- return if (newHash == lastConfigurationHash.get()) {
- Mono.empty()
- } else {
- lastConfigurationHash.set(newHash)
- Mono.just(base64Value)
- }
- }
+ private fun filterDifferentValues(configurationString: String) =
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ Mono.empty()
+ } else {
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
+ }
private fun hashOf(str: String) = str.hashCode()
- private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
- val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
- logger.info("Obtained new configuration from consul:\n$decodedValue")
- return Json.createReader(StringReader(decodedValue)).readObject()
- }
+ private fun parseJsonResponse(responseString: String): JsonObject =
+ Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+ logger.info { "Obtained new configuration from consul:\n${configuration}" }
val routing = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 7a1a4cdc..c6364f74 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -141,9 +141,8 @@ private fun constructConsulConfigProvider(url: String,
const val kafkaAddress = "message-router-kafka"
-fun constructConsulResponse(): String {
-
- val config = """{
+fun constructConsulResponse(): String =
+ """{
"dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
"collector.routing": [
{
@@ -156,18 +155,3 @@ fun constructConsulResponse(): String {
}
]
}"""
-
- val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
-
- return """[
- {
- "CreateIndex": 100,
- "ModifyIndex": 200,
- "LockIndex": 200,
- "Key": "zip",
- "Flags": 0,
- "Value": "$encodedValue",
- "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
- }
- ]"""
-}