diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2018-11-27 23:28:54 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-11-27 23:28:54 +0000 |
commit | 4f683d3e17025131a297baa67300da679bb85141 (patch) | |
tree | 6cb723d8c356d196cb8acf6818fd5053e7569ef4 /hv-collector-core | |
parent | f66b6052f2ba2c792278ebb38ef3990835c4c3ed (diff) | |
parent | e4dd72d3804b697f110a79ba5510c654f0765ec5 (diff) |
Merge "Remove Consul configuration decoding"
Diffstat (limited to 'hv-collector-core')
2 files changed, 16 insertions, 43 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index af4bbaa1..ec7c60c0 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -31,7 +31,6 @@ import reactor.retry.Jitter import reactor.retry.Retry import java.io.StringReader import java.time.Duration -import java.util.* import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -72,41 +71,31 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, override fun invoke(): Flux<CollectorConfiguration> = Flux.interval(firstRequestDelay, requestInterval) - .flatMap { askForConfig() } - .map(::parseJsonResponse) - .map(::extractEncodedConfiguration) + .concatMap { askForConfig() } .flatMap(::filterDifferentValues) - .map(::decodeConfiguration) + .map(::parseJsonResponse) .map(::createCollectorConfiguration) .retryWhen(retry) private fun askForConfig(): Mono<String> = http.get(url) - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() - - private fun extractEncodedConfiguration(response: JsonObject): String = - response.getString("Value") - - private fun filterDifferentValues(base64Value: String): Mono<String> { - val newHash = hashOf(base64Value) - return if (newHash == lastConfigurationHash.get()) { - Mono.empty() - } else { - lastConfigurationHash.set(newHash) - Mono.just(base64Value) - } - } + private fun filterDifferentValues(configurationString: String) = + hashOf(configurationString).let { + if (it == lastConfigurationHash.get()) { + Mono.empty() + } else { + lastConfigurationHash.set(it) + Mono.just(configurationString) + } + } private fun hashOf(str: String) = str.hashCode() - private fun decodeConfiguration(encodedConfiguration: String): JsonObject { - val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration)) - logger.info("Obtained new configuration from consul:\n$decodedValue") - return Json.createReader(StringReader(decodedValue)).readObject() - } + private fun parseJsonResponse(responseString: String): JsonObject = + Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { + logger.info { "Obtained new configuration from consul:\n${configuration}" } val routing = configuration.getJsonArray("collector.routing") return CollectorConfiguration( diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 7a1a4cdc..c6364f74 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -141,9 +141,8 @@ private fun constructConsulConfigProvider(url: String, const val kafkaAddress = "message-router-kafka" -fun constructConsulResponse(): String { - - val config = """{ +fun constructConsulResponse(): String = + """{ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", "collector.routing": [ { @@ -156,18 +155,3 @@ fun constructConsulResponse(): String { } ] }""" - - val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) - - return """[ - { - "CreateIndex": 100, - "ModifyIndex": 200, - "LockIndex": 200, - "Key": "zip", - "Flags": 0, - "Value": "$encodedValue", - "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e" - } - ]""" -} |