aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt63
1 files changed, 63 insertions, 0 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
new file mode 100644
index 00000000..ef6c2f76
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -0,0 +1,63 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Flux
+import java.io.StringReader
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.atomic.AtomicReference
+import javax.json.Json
+import javax.json.JsonObject
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
+ : ConfigurationProvider {
+
+
+ private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+ private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
+ .flatMap { http.getResponse(url) }
+ .filter { body -> body.hashCode() != lastConfigurationHash.get() }
+ .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
+ .map { str -> getConfigurationJson(str) }
+ .map { json -> createCollectorConfiguration(json) }
+
+ private fun getConfigurationJson(str: String): JsonObject {
+ val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
+ val decodedValue = String(
+ Base64.getDecoder().decode(response.getString("Value")))
+ logger.info("Obtained new configuration from consul:\n$decodedValue")
+ return Json.createReader(StringReader(decodedValue)).readObject()
+ }
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+
+ val routing = configuration.getJsonObject("routing")
+
+ return CollectorConfiguration(
+ kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
+ routing = org.onap.dcae.collectors.veshv.domain.routing {
+ defineRoute {
+ fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
+ toTopic(routing.getString("toTopic"))
+ withFixedPartitioning()
+ }
+ }.build()
+ )
+ }
+
+ companion object {
+ private const val REFRESH_INTERVAL_MINUTES: Long = 5
+ private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
+ }
+}