blob: ef6c2f7602849a8cdcde221bc26be9dbf53653e3 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
import org.onap.ves.VesEventV5
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.io.StringReader
import java.time.Duration
import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
: ConfigurationProvider {
private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
override fun invoke(): Flux<CollectorConfiguration> =
Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
.flatMap { http.getResponse(url) }
.filter { body -> body.hashCode() != lastConfigurationHash.get() }
.doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
.map { str -> getConfigurationJson(str) }
.map { json -> createCollectorConfiguration(json) }
private fun getConfigurationJson(str: String): JsonObject {
val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
val decodedValue = String(
Base64.getDecoder().decode(response.getString("Value")))
logger.info("Obtained new configuration from consul:\n$decodedValue")
return Json.createReader(StringReader(decodedValue)).readObject()
}
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
val routing = configuration.getJsonObject("routing")
return CollectorConfiguration(
kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
routing = org.onap.dcae.collectors.veshv.domain.routing {
defineRoute {
fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
toTopic(routing.getString("toTopic"))
withFixedPartitioning()
}
}.build()
)
}
companion object {
private const val REFRESH_INTERVAL_MINUTES: Long = 5
private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
}
}
|