aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
blob: ef6c2f7602849a8cdcde221bc26be9dbf53653e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package org.onap.dcae.collectors.veshv.impl.adapters

import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
import org.onap.ves.VesEventV5
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.io.StringReader
import java.time.Duration
import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject


/**
 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 * @since May 2018
 */
internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
    : ConfigurationProvider {


    private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
    private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()

    override fun invoke(): Flux<CollectorConfiguration> =
            Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
                    .flatMap { http.getResponse(url) }
                    .filter { body -> body.hashCode() != lastConfigurationHash.get() }
                    .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
                    .map { str -> getConfigurationJson(str) }
                    .map { json -> createCollectorConfiguration(json) }

    private fun getConfigurationJson(str: String): JsonObject {
        val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
        val decodedValue = String(
                Base64.getDecoder().decode(response.getString("Value")))
        logger.info("Obtained new configuration from consul:\n$decodedValue")
        return Json.createReader(StringReader(decodedValue)).readObject()
    }

    private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {

        val routing = configuration.getJsonObject("routing")

        return CollectorConfiguration(
                kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
                routing = org.onap.dcae.collectors.veshv.domain.routing {
                    defineRoute {
                        fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
                        toTopic(routing.getString("toTopic"))
                        withFixedPartitioning()
                    }
                }.build()
        )
    }

    companion object {
        private const val REFRESH_INTERVAL_MINUTES: Long = 5
        private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
    }
}