diff options
Diffstat (limited to 'hv-collector-core')
6 files changed, 138 insertions, 62 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index d9e7432d..2a8a3960 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -33,8 +33,8 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider = - ConsulConfigurationProvider(url, updateInterval, httpAdapter()) + fun consulConfigurationProvider(url: String, firstRequestDelay: Duration): ConfigurationProvider = + ConsulConfigurationProvider(url, httpAdapter(), firstRequestDelay) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index c70d128a..727f025b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -26,9 +26,14 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.ipc.netty.http.client.HttpClientException +import reactor.retry.Retry +import reactor.retry.retryExponentialBackoff import java.io.StringReader import java.time.Duration -import java.util.Base64 +import java.util.* +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -39,18 +44,17 @@ import javax.json.JsonObject * @since May 2018 */ internal class ConsulConfigurationProvider(private val url: String, - private val updateInterval: Duration, - private val http: HttpAdapter + private val http: HttpAdapter, + private val firstRequestDelay: Duration ) : ConfigurationProvider { - - private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) - private var lastConfigurationHash: AtomicReference<Int> = AtomicReference() + private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0) + private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) override fun invoke(): Flux<CollectorConfiguration> = Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) - private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just( + private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just( CollectorConfiguration( kafkaBootstrapServers = "kafka:9092", routing = routing { @@ -60,22 +64,45 @@ internal class ConsulConfigurationProvider(private val url: String, withFixedPartitioning() } }.build()) - ).doOnNext { logger.info("Applied default configuration") } - - private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval) - .flatMap { http.get(url) } - .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " + - "Shutting down..") } - .filter { it.hashCode() != lastConfigurationHash.get() } - .doOnNext { lastConfigurationHash.set(it.hashCode()) } - .map { getConfigurationJson(it) } - .map { createCollectorConfiguration(it) } - - - private fun getConfigurationJson(str: String): JsonObject { - val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) - val decodedValue = String( - Base64.getDecoder().decode(response.getString("Value"))) + ).doOnNext { logger.info("Applied default configuration") }.delayElement(firstRequestDelay) + + private fun createConsulFlux(): Flux<CollectorConfiguration> = + http.get(url, mapOf(Pair("index", lastModifyIndex.get()))) + .doOnError { + logger.error("Encountered an error " + + "when trying to acquire configuration from consul. Shutting down..") + } + .map(::parseJsonResponse) + .doOnNext(::updateModifyIndex) + .map(::extractEncodedConfiguration) + .flatMap(::filterDifferentValues) + .map(::decodeConfiguration) + .map(::createCollectorConfiguration) + .repeat() + + private fun parseJsonResponse(responseString: String): JsonObject = + Json.createReader(StringReader(responseString)).readArray().first().asJsonObject() + + private fun updateModifyIndex(response: JsonObject) = + lastModifyIndex.set(response.getInt("ModifyIndex")) + + private fun extractEncodedConfiguration(response: JsonObject): String = + response.getString("Value") + + private fun filterDifferentValues(base64Value: String): Mono<String> { + val newHash = hashOf(base64Value) + return if (newHash == lastConfigurationHash.get()) { + Mono.empty() + } else { + lastConfigurationHash.set(newHash) + Mono.just(base64Value) + } + } + + private fun hashOf(str: String) = str.hashCode() + + private fun decodeConfiguration(encodedConfiguration: String): JsonObject { + val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration)) logger.info("Obtained new configuration from consul:\n$decodedValue") return Json.createReader(StringReader(decodedValue)).readObject() } @@ -97,5 +124,9 @@ internal class ConsulConfigurationProvider(private val url: String, }.build() ) } + + companion object { + private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index a41cd09f..4503955f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.core.publisher.toMono import reactor.ipc.netty.http.client.HttpClient -import reactor.ipc.netty.http.client.HttpClientResponse import java.nio.charset.Charset /** @@ -34,12 +33,31 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun get(url: String): Mono<String> = - httpClient.get(url) - .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) - } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } + open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient + .get(url + createQueryString(queryParams)) + .doOnError { + logger.error("Failed to get resource on path: $url (${it.localizedMessage})") + logger.debug("Nested exception:", it) + } + .flatMap { it.receiveContent().toMono() } + .map { it.content().toString(Charset.defaultCharset()) } + + + private fun createQueryString(params: Map<String, Any>): String { + if (params.isEmpty()) + return "" + + val builder = StringBuilder("?") + params.forEach { (key, value) -> + builder + .append(key) + .append("=") + .append(value) + .append("&") + + } + + return builder.removeSuffix("&").toString() + } + } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 025c59f6..a486996e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,7 +29,7 @@ import java.time.Duration data class ServerConfiguration( val port: Int, val configurationUrl: String, - val configurationUpdateInterval: Duration, + val firstRequestDelay: Duration, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index dd190848..c98c97a6 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.mockito.Mockito import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.ipc.netty.http.client.HttpClient @@ -39,15 +41,16 @@ import kotlin.test.assertEquals */ internal object ConsulConfigurationProviderTest : Spek({ - val updateInterval = Duration.ofMillis(1) val httpAdapterMock: HttpAdapter = mock() + val firstRequestDelay = Duration.ofMillis(1) given("valid resource url") { val validUrl = "http://valid-url/" - val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock) + val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay) - whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse())) + whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) + .thenReturn(Mono.just(constructConsulResponse())) it("should use default configuration at the beginning, " + "then apply received configuration") { @@ -79,9 +82,10 @@ internal object ConsulConfigurationProviderTest : Spek({ given("invalid resource url") { val invalidUrl = "http://invalid-url/" - val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock) + val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay) - whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception"))) + whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) + .thenReturn(Mono.error(RuntimeException("Test exception"))) it("should use default configuration at the beginning, then should interrupt the flux") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 79eda995..123d8f72 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -20,10 +20,12 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever import io.netty.buffer.Unpooled import io.netty.handler.codec.http.HttpContent import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import reactor.core.publisher.Flux @@ -32,44 +34,65 @@ import reactor.ipc.netty.http.client.HttpClient import reactor.ipc.netty.http.client.HttpClientResponse import reactor.test.StepVerifier import java.nio.charset.Charset -import kotlin.test.assertEquals /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ internal object HttpAdapterTest : Spek({ - - given("valid resource url") { + describe("HttpAdapter") { val httpClientMock: HttpClient = mock() val httpAdapter = HttpAdapter(httpClientMock) - val validUrl = "http://valid-url/" - val responseContent = """{"key1": "value1", "key2": "value2"}""" - val httpResponse = createHttpResponseMock(responseContent) - whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) - it("should return response string") { - StepVerifier - .create(httpAdapter.get(validUrl)) - .expectNext(responseContent) + given("url without query params") { + val initialUrl = "http://test-url" + whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty()) + + it("should not append query string") { + httpAdapter.get(initialUrl) + verify(httpClientMock).get(initialUrl) + } } - } - given("invalid resource url") { + given("url with query params") { + val queryParams = mapOf(Pair("key", "value")) + val initialUrl = "http://test-url" + val expectedUrl = "http://test-url?key=value" + whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty()) - val httpClientMock: HttpClient = mock() - val httpAdapter = HttpAdapter(httpClientMock) - val invalidUrl = "http://invalid-url/" - val exceptionMessage = "Test exception" - whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) + it("should parse them to query string and append to url") { + httpAdapter.get(initialUrl, queryParams) + verify(httpClientMock).get(expectedUrl) + } + } - it("should interrupt the flux") { - StepVerifier - .create(httpAdapter.get(invalidUrl)) - .verifyErrorMessage(exceptionMessage) + given("valid resource url") { + val validUrl = "http://valid-url/" + val responseContent = """{"key1": "value1", "key2": "value2"}""" + val httpResponse = createHttpResponseMock(responseContent) + whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) + + it("should return response string") { + StepVerifier + .create(httpAdapter.get(validUrl)) + .expectNext(responseContent) + } + } + + given("invalid resource url") { + val invalidUrl = "http://invalid-url/" + val exceptionMessage = "Test exception" + whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) + + it("should interrupt the flux") { + StepVerifier + .create(httpAdapter.get(invalidUrl)) + .verifyErrorMessage(exceptionMessage) + } } } + }) fun createHttpResponseMock(content: String): HttpClientResponse { |