diff options
Diffstat (limited to 'sources/hv-collector-core')
10 files changed, 339 insertions, 216 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 29e1ea94..c21f2ed2 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -3,7 +3,7 @@ ~ ============LICENSE_START======================================================= ~ dcaegen2-collectors-veshv ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA + ~ Copyright (C) 2018-2019 NOKIA ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <licenses> @@ -85,6 +85,10 @@ <version>${project.parent.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>cbs-client</artifactId> + </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> @@ -114,15 +118,6 @@ <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> - <dependency> - <groupId>javax.json</groupId> - <artifactId>javax.json-api</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <scope>runtime</scope> - </dependency> </dependencies> </project> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 535d1baa..633095dc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider, val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() .doOnNext { - logger.info { "Using updated configuration for new connections" } + logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error { "Failed to acquire configuration from consul" } + logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } + logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) return object : CollectorProvider { override fun invoke(ctx: ClientContext): Option<Collector> = - config.getOption().map { createVesHvCollector(it, ctx) } + config.getOption().map { createVesHvCollector(it, ctx) } override fun close() = sinkProvider.close() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 75b6f0a6..312d6d7b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.KafkaConfiguration -import reactor.netty.http.client.HttpClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,8 +39,8 @@ object AdapterFactory { else KafkaSinkProvider(kafkaConfig) - fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) - - private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) + fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConfigurationProviderImpl( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + configurationProviderParams) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt new file mode 100644 index 00000000..736f474a --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.google.gson.JsonObject +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>, + private val firstRequestDelay: Duration, + private val requestInterval: Duration, + private val healthState: HealthState, + retrySpec: Retry<Any> + +) : ConfigurationProvider { + constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this( + cbsClientMono, + params.firstRequestDelay, + params.requestInterval, + HealthState.INSTANCE, + Retry.any<Any>() + .retryMax(MAX_RETRIES) + .fixedBackoff(params.requestInterval) + .jitter(Jitter.random()) + ) + + private val retry = retrySpec.doOnRetry { + logger.withWarn(ServiceContext::mdc) { + log("Exception from configuration provider client, retrying subscription", it.exception()) + } + healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + } + + override fun invoke(): Flux<CollectorConfiguration> = + cbsClientMono + .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } + .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry) + .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } + .flatMapMany(::handleUpdates) + + private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient + .updates(RequestDiagnosticContext.create(), + firstRequestDelay, + requestInterval) + .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } + .map(::createCollectorConfiguration) + .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } + .retryWhen(retry) + + + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + try { + val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) + CollectorConfiguration( + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() + ) + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } + + private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString + + + companion object { + private const val ROUTING_CONFIGURATION_KEY = "collector.routing" + private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" + private const val TOPIC_CONFIGURATION_KEY = "toTopic" + + private const val MAX_RETRIES = 5L + private val logger = Logger(ConfigurationProviderImpl::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt deleted file mode 100644 index d58cc792..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.io.StringReader -import java.security.MessageDigest -import java.time.Duration -import java.util.* -import java.util.concurrent.atomic.AtomicReference -import javax.json.Json -import javax.json.JsonObject - - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal class ConsulConfigurationProvider(private val http: HttpAdapter, - private val url: String, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - retrySpec: Retry<Any> - -) : ConfigurationProvider { - private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf()) - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - constructor(http: HttpAdapter, - params: ConfigurationProviderParams) : this( - http, - params.configurationUrl, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random()) - ) - - override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(firstRequestDelay, requestInterval) - .concatMap { askForConfig() } - .flatMap(::filterDifferentValues) - .map(::parseJsonResponse) - .map(::createCollectorConfiguration) - .retryWhen(retry) - - private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer { - val invocationId = UUID.randomUUID() - http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) } - } - - private fun filterDifferentValues(configuration: BodyWithInvocationId) = - configuration.body.let { configurationString -> - configurationString.sha256().let { newHash -> - if (newHash contentEquals lastConfigurationHash.get()) { - logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "No change detected in consul configuration" - } - Mono.empty() - } else { - logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "Obtained new configuration from consul:\n$configurationString" - } - lastConfigurationHash.set(newHash) - Mono.just(configurationString) - } - } - } - - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readObject() - - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = - try { - val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse consul configuration", e) - } - - - companion object { - private const val ROUTING_CONFIGURATION_KEY = "collector.routing" - private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" - private const val TOPIC_CONFIGURATION_KEY = "toTopic" - - private const val MAX_RETRIES = 5L - private const val BACKOFF_INTERVAL_FACTOR = 30L - private val logger = Logger(ConsulConfigurationProvider::class) - private fun String.sha256() = - MessageDigest - .getInstance("SHA-256") - .digest(toByteArray()) - - } - - private data class BodyWithInvocationId(val body: String, val invocationId: UUID) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt index 91f502e6..a1e5b8fd 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett withConnectionFrom(nettyInbound) { connection -> clientContext.clientAddress = Try { connection.address().address }.toOption() clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } - }
\ No newline at end of file + } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt index 9de34498..ac7a9db0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,5 @@ import java.time.Duration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since July 2018 */ -data class ConfigurationProviderParams(val configurationUrl: String, - val firstRequestDelay: Duration, +data class ConfigurationProviderParams(val firstRequestDelay: Duration, val requestInterval: Duration) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt index ccae3c99..21aaa129 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import com.google.gson.JsonParser import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock @@ -29,11 +30,12 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.mockito.Mockito import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Retry @@ -44,24 +46,36 @@ import java.time.Duration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since May 2018 */ -internal object ConsulConfigurationProviderTest : Spek({ +internal object ConfigurationProviderImplTest : Spek({ - describe("Consul configuration provider") { + describe("Configuration provider") { - val httpAdapterMock: HttpAdapter = mock() + val cbsClient: CbsClient = mock() + val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient) val healthStateProvider = HealthState.INSTANCE - given("valid resource url") { - val validUrl = "http://valid-url/" - val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) + given("configuration is never in cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) - on("call to consul") { - whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap())) - .thenReturn(Mono.just(constructConsulResponse())) + on("waiting for configuration") { + val waitTime = Duration.ofMillis(100) + it("should not get it") { + StepVerifier.create(configProvider().take(1)) + .expectNoEvent(waitTime) + } + } + + } + given("valid configuration from cbs") { + val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) + + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(validConfiguration)) it("should use received configuration") { - StepVerifier.create(consulConfigProvider().take(1)) + StepVerifier.create(configProvider().take(1)) .consumeNextWith { val route1 = it.routing.routes[0] @@ -85,22 +99,19 @@ internal object ConsulConfigurationProviderTest : Spek({ } } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - + given("invalid configuration from cbs") { val iterationCount = 3L - val consulConfigProvider = constructConsulConfigProvider( - invalidUrl, httpAdapterMock, healthStateProvider, iterationCount + val configProvider = constructConfigurationProvider( + cbsClientMock, healthStateProvider, iterationCount ) - on("call to consul") { - whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap())) - .thenReturn(Mono.error(RuntimeException("Test exception"))) + on("new configuration") { + whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval))) + .thenReturn(Flux.just(invalidConfiguration)) it("should interrupt the flux") { - - StepVerifier.create(consulConfigProvider()) - .verifyErrorMessage("Test exception") + StepVerifier.create(configProvider()) + .verifyError() } it("should update the health state") { @@ -115,28 +126,9 @@ internal object ConsulConfigurationProviderTest : Spek({ }) -private fun constructConsulConfigProvider(url: String, - httpAdapter: HttpAdapter, - healthState: HealthState, - iterationCount: Long = 1 -): ConsulConfigurationProvider { - - val firstRequestDelay = Duration.ofMillis(1) - val requestInterval = Duration.ofMillis(1) - val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - return ConsulConfigurationProvider( - httpAdapter, - url, - firstRequestDelay, - requestInterval, - healthState, - retry - ) -} - -fun constructConsulResponse(): String = - """{ +private val validConfiguration = JsonParser().parse(""" +{ "whatever": "garbage", "collector.routing": [ { @@ -148,4 +140,34 @@ fun constructConsulResponse(): String = "toTopic": "test-topic-2" } ] - }""" +}""").asJsonObject + +private val invalidConfiguration = JsonParser().parse(""" +{ + "whatever": "garbage", + "collector.routing": [ + { + "fromDomain": "garbage", + "meaningful": "garbage" + } + ] +}""").asJsonObject + +private val firstRequestDelay = Duration.ofMillis(1) +private val requestInterval = Duration.ofMillis(1) + +private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>, + healthState: HealthState, + iterationCount: Long = 1 +): ConfigurationProviderImpl { + + val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConfigurationProviderImpl( + cbsClientMono, + firstRequestDelay, + requestInterval, + healthState, + retry + ) +} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt new file mode 100644 index 00000000..63caaf0a --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import com.google.protobuf.MessageLite +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.ves.VesEventOuterClass.CommonEventHeader.* + + +class ProtobufSerializerTest : Spek({ + + describe("ProtobufSerializerTest") { + val serializer = ProtobufSerializer() + + on("serialize") { + it("should return byte array from WTP Frame paylaod") { + val header = getDefaultInstance() + val payload = header.toByteArray() + val msg: MessageLite = mock() + + serializer.serialize("", msg) + + verify(msg).toByteArray() + } + } + + on("configuring") { + it("should do nothing") { + // increase code coverage + serializer.configure(hashMapOf<String, String>(), false) + } + } + + on("closing") { + it("should do nothing") { + // increase code coverage + serializer.close() + } + } + } + + +})
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt new file mode 100644 index 00000000..3a194b47 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventOuterClass.CommonEventHeader.* + + +class VesMessageSerializerTest : Spek({ + + describe("VesMessageSerializer") { + val serializer = VesMessageSerializer() + + on("serialize") { + it("should return byte array from WTP Frame paylaod") { + val header = getDefaultInstance() + val payload = header.toByteArray() + val msg = VesMessage(header, WireFrameMessage(payload)) + + val serialized = serializer.serialize("", msg) + + assertThat(serialized).isEqualTo(payload) + } + } + + on("configuring") { + it("should do nothing") { + // increase code coverage + serializer.configure(hashMapOf<String, String>(), false) + } + } + + on("closing") { + it("should do nothing") { + // increase code coverage + serializer.close() + } + } + } + + + +})
\ No newline at end of file |