diff options
Diffstat (limited to 'hv-collector-core')
8 files changed, 142 insertions, 66 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 8785180b..06047fd4 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -49,7 +49,9 @@ class CollectorFactory(val configuration: ConfigurationProvider, } private fun createVesHvCollector(): Flux<Collector> = - configuration().map(this::createVesHvCollector) + configuration() + .doOnError { System.exit(ERROR_CODE) } + .map(this::createVesHvCollector) private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( @@ -61,5 +63,8 @@ class CollectorFactory(val configuration: ConfigurationProvider, metrics = metrics) } + companion object { + const val ERROR_CODE = 3 + } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 358be108..d9e7432d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -22,9 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -34,13 +33,8 @@ object AdapterFactory { fun kafkaSink(): SinkProvider = KafkaSinkProvider() fun loggingSink(): SinkProvider = LoggingSinkProvider() - fun staticConfigurationProvider(config: CollectorConfiguration) = - object : ConfigurationProvider { - override fun invoke() = Flux.just(config) - } - - fun consulConfigurationProvider(url: String): ConfigurationProvider = - ConsulConfigurationProvider(url, httpAdapter()) + fun consulConfigurationProvider(url: String, updateInterval: Duration): ConfigurationProvider = + ConsulConfigurationProvider(url, updateInterval, httpAdapter()) fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 04e4927d..c70d128a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -21,12 +21,14 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.ves.VesEventV5 +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import java.io.StringReader import java.time.Duration -import java.util.* +import java.util.Base64 import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -36,20 +38,39 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter) - : ConfigurationProvider { +internal class ConsulConfigurationProvider(private val url: String, + private val updateInterval: Duration, + private val http: HttpAdapter +) : ConfigurationProvider { private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java) private var lastConfigurationHash: AtomicReference<Int> = AtomicReference() override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(Duration.ZERO, REFRESH_INTERVAL) - .flatMap { http.getResponse(url) } - .filter { body -> body.hashCode() != lastConfigurationHash.get() } - .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) } - .map { str -> getConfigurationJson(str) } - .map { json -> createCollectorConfiguration(json) } + Flux.concat(createDefaultConfigurationFlux(), createConsulFlux()) + + private fun createDefaultConfigurationFlux(): Flux<CollectorConfiguration> = Flux.just( + CollectorConfiguration( + kafkaBootstrapServers = "kafka:9092", + routing = routing { + defineRoute { + fromDomain(HVRANMEAS) + toTopic("ves_hvRanMeas") + withFixedPartitioning() + } + }.build()) + ).doOnNext { logger.info("Applied default configuration") } + + private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux.interval(updateInterval) + .flatMap { http.get(url) } + .doOnError { logger.error("Encountered an error when trying to acquire configuration from consul. " + + "Shutting down..") } + .filter { it.hashCode() != lastConfigurationHash.get() } + .doOnNext { lastConfigurationHash.set(it.hashCode()) } + .map { getConfigurationJson(it) } + .map { createCollectorConfiguration(it) } + private fun getConfigurationJson(str: String): JsonObject { val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0) @@ -60,23 +81,21 @@ internal class ConsulConfigurationProvider(private val url: String, private val } private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - - val routing = configuration.getJsonObject("routing") + val routing = configuration.getJsonArray("routing") return CollectorConfiguration( kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"), routing = org.onap.dcae.collectors.veshv.model.routing { - defineRoute { - fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain"))) - toTopic(routing.getString("toTopic")) - withFixedPartitioning() + for (route in routing) { + val routeObj = route.asJsonObject() + defineRoute { + fromDomain(forNumber(routeObj.getInt("fromDomain"))) + toTopic(routeObj.getString("toTopic")) + withFixedPartitioning() + } } }.build() ) } - - companion object { - private const val REFRESH_INTERVAL_MINUTES: Long = 5 - private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES) - } } + diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 236e3cb7..a41cd09f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -34,15 +34,12 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - open fun getResponse(url: String): Mono<String> = + open fun get(url: String): Mono<String> = httpClient.get(url) - .onErrorResume { e -> unableToGetResource(e, url) } - .flatMap { res -> res.receiveContent().toMono() } - .map { content -> content.content().toString(Charset.defaultCharset()) } - - - private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> { - logger.info("Failed to get resource on path: $url\n${e.localizedMessage}") - return Mono.empty() - } + .doOnError { + logger.error("Failed to get resource on path: $url (${it.localizedMessage})") + logger.debug("Nested exception:", it) + } + .flatMap { it.receiveContent().toMono() } + .map { it.content().toString(Charset.defaultCharset()) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 580d36c5..056e0557 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -31,10 +31,12 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder(private val decoder: WireFrameDecoder, + alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() - fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf) + fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter + .createFlux(decoder, streamBuffer, byteBuf) .doOnSubscribe { logIncomingMessage(byteBuf) } .doOnNext(this::logDecodedWireMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 67a7d6f2..025c59f6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -29,6 +29,7 @@ import java.time.Duration data class ServerConfiguration( val port: Int, val configurationUrl: String, + val configurationUpdateInterval: Duration, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index b2da430d..dd190848 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -20,12 +20,16 @@ package org.onap.dcae.collectors.veshv.impl.adapters import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono +import reactor.ipc.netty.http.client.HttpClient +import reactor.test.StepVerifier +import java.time.Duration import java.util.* import kotlin.test.assertEquals @@ -35,20 +39,63 @@ import kotlin.test.assertEquals */ internal object ConsulConfigurationProviderTest : Spek({ + val updateInterval = Duration.ofMillis(1) + val httpAdapterMock: HttpAdapter = mock() + given("valid resource url") { - val testUrl = "http://valid-url/" - val httpAdapterMock: HttpAdapter = mock() - val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock) - whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse())) + val validUrl = "http://valid-url/" + val consulConfigProvider = ConsulConfigurationProvider(validUrl, updateInterval, httpAdapterMock) + + whenever(httpAdapterMock.get(validUrl)).thenReturn(Mono.just(constructConsulResponse())) + + it("should use default configuration at the beginning, " + + "then apply received configuration") { + + StepVerifier.create(consulConfigProvider().take(2)) + .consumeNextWith { + + assertEquals("kafka:9092", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HVRANMEAS, route1.domain) + assertEquals("ves_hvRanMeas", route1.targetTopic) + } + .consumeNextWith { + + assertEquals("kafka:9093", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HEARTBEAT, route1.domain) + assertEquals("test-topic-1", route1.targetTopic) + + val route2 = it.routing.routes[1] + assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route2.domain) + assertEquals("test-topic-2", route2.targetTopic) + }.verifyComplete() + } + } + given("invalid resource url") { + + val invalidUrl = "http://invalid-url/" + val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, updateInterval, httpAdapterMock) + + whenever(httpAdapterMock.get(invalidUrl)).thenReturn(Mono.error(RuntimeException("Test exception"))) + + it("should use default configuration at the beginning, then should interrupt the flux") { - it("should create valid collector configuration") { - val response = consulConfigProvider().blockFirst() - assertEquals("val1", response.kafkaBootstrapServers) - val route = response.routing.routes[0] - assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain) - assertEquals("val3", route.targetTopic) + StepVerifier.create(consulConfigProvider()) + .consumeNextWith { + + + assertEquals("kafka:9092", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertEquals(Domain.HVRANMEAS, route1.domain) + assertEquals("ves_hvRanMeas", route1.targetTopic) + } + .verifyErrorMessage("Test exception") } } }) @@ -56,11 +103,17 @@ internal object ConsulConfigurationProviderTest : Spek({ fun constructConsulResponse(): String { val config = """{ - "kafkaBootstrapServers": "val1", - "routing": { - "fromDomain": 2, - "toTopic": "val3" - } + "kafkaBootstrapServers": "kafka:9093", + "routing": [ + { + "fromDomain": 1, + "toTopic": "test-topic-1" + }, + { + "fromDomain": 2, + "toTopic": "test-topic-2" + } + ] }""" val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 66288453..79eda995 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -30,6 +30,7 @@ import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.ipc.netty.http.client.HttpClient import reactor.ipc.netty.http.client.HttpClientResponse +import reactor.test.StepVerifier import java.nio.charset.Charset import kotlin.test.assertEquals @@ -43,13 +44,15 @@ internal object HttpAdapterTest : Spek({ val httpClientMock: HttpClient = mock() val httpAdapter = HttpAdapter(httpClientMock) - val testUrl = "http://valid-url/" + val validUrl = "http://valid-url/" val responseContent = """{"key1": "value1", "key2": "value2"}""" val httpResponse = createHttpResponseMock(responseContent) - whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse)) + whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) it("should return response string") { - assertEquals(responseContent, httpAdapter.getResponse(testUrl).block()) + StepVerifier + .create(httpAdapter.get(validUrl)) + .expectNext(responseContent) } } @@ -57,12 +60,14 @@ internal object HttpAdapterTest : Spek({ val httpClientMock: HttpClient = mock() val httpAdapter = HttpAdapter(httpClientMock) - val testUrl = "http://invalid-url/" - whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception"))) + val invalidUrl = "http://invalid-url/" + val exceptionMessage = "Test exception" + whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) - - it("should return null response") { - assertEquals(null, httpAdapter.getResponse(testUrl).block()) + it("should interrupt the flux") { + StepVerifier + .create(httpAdapter.get(invalidUrl)) + .verifyErrorMessage(exceptionMessage) } } }) |