aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/pom.xml19
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt13
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt118
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt145
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt5
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt)112
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt65
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt66
10 files changed, 339 insertions, 216 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 29e1ea94..c21f2ed2 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -3,7 +3,7 @@
~ ============LICENSE_START=======================================================
~ dcaegen2-collectors-veshv
~ ================================================================================
- ~ Copyright (C) 2018 NOKIA
+ ~ Copyright (C) 2018-2019 NOKIA
~ ================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -19,8 +19,8 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<licenses>
@@ -85,6 +85,10 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
@@ -114,15 +118,6 @@
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
- <dependency>
- <groupId>javax.json</groupId>
- <artifactId>javax.json-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- <scope>runtime</scope>
- </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 535d1baa..633095dc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider,
val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
.doOnNext {
- logger.info { "Using updated configuration for new connections" }
+ logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error { "Failed to acquire configuration from consul" }
+ logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+ logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
return object : CollectorProvider {
override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
override fun close() = sinkProvider.close()
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 75b6f0a6..312d6d7b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
-import reactor.netty.http.client.HttpClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,8 +39,8 @@ object AdapterFactory {
else
KafkaSinkProvider(kafkaConfig)
- fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
- private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+ fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConfigurationProviderImpl(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ configurationProviderParams)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
new file mode 100644
index 00000000..736f474a
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration,
+ private val healthState: HealthState,
+ retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+ constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ cbsClientMono,
+ params.firstRequestDelay,
+ params.requestInterval,
+ HealthState.INSTANCE,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval)
+ .jitter(Jitter.random())
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(ServiceContext::mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ }
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ .updates(RequestDiagnosticContext.create(),
+ firstRequestDelay,
+ requestInterval)
+ .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+ .map(::createCollectorConfiguration)
+ .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ try {
+ val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
+ CollectorConfiguration(
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+
+ private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+
+
+ companion object {
+ private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
+ private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
+ private const val TOPIC_CONFIGURATION_KEY = "toTopic"
+
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(ConfigurationProviderImpl::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644
index d58cc792..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.security.MessageDigest
-import java.time.Duration
-import java.util.*
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
- private val url: String,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
- private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
- private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- }
-
- constructor(http: HttpAdapter,
- params: ConfigurationProviderParams) : this(
- http,
- params.configurationUrl,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random())
- )
-
- override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(firstRequestDelay, requestInterval)
- .concatMap { askForConfig() }
- .flatMap(::filterDifferentValues)
- .map(::parseJsonResponse)
- .map(::createCollectorConfiguration)
- .retryWhen(retry)
-
- private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
- val invocationId = UUID.randomUUID()
- http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
- }
-
- private fun filterDifferentValues(configuration: BodyWithInvocationId) =
- configuration.body.let { configurationString ->
- configurationString.sha256().let { newHash ->
- if (newHash contentEquals lastConfigurationHash.get()) {
- logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "No change detected in consul configuration"
- }
- Mono.empty()
- } else {
- logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "Obtained new configuration from consul:\n$configurationString"
- }
- lastConfigurationHash.set(newHash)
- Mono.just(configurationString)
- }
- }
- }
-
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readObject()
-
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
- try {
- val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse consul configuration", e)
- }
-
-
- companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
- private const val MAX_RETRIES = 5L
- private const val BACKOFF_INTERVAL_FACTOR = 30L
- private val logger = Logger(ConsulConfigurationProvider::class)
- private fun String.sha256() =
- MessageDigest
- .getInstance("SHA-256")
- .digest(toByteArray())
-
- }
-
- private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
index 91f502e6..a1e5b8fd 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
@@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett
withConnectionFrom(nettyInbound) { connection ->
clientContext.clientAddress = Try { connection.address().address }.toOption()
clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
- } \ No newline at end of file
+ }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
index 9de34498..ac7a9db0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,5 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since July 2018
*/
-data class ConfigurationProviderParams(val configurationUrl: String,
- val firstRequestDelay: Duration,
+data class ConfigurationProviderParams(val firstRequestDelay: Duration,
val requestInterval: Duration)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index ccae3c99..21aaa129 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import com.google.gson.JsonParser
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
@@ -29,11 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.mockito.Mockito
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -44,24 +46,36 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since May 2018
*/
-internal object ConsulConfigurationProviderTest : Spek({
+internal object ConfigurationProviderImplTest : Spek({
- describe("Consul configuration provider") {
+ describe("Configuration provider") {
- val httpAdapterMock: HttpAdapter = mock()
+ val cbsClient: CbsClient = mock()
+ val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
val healthStateProvider = HealthState.INSTANCE
- given("valid resource url") {
- val validUrl = "http://valid-url/"
- val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+ given("configuration is never in cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
- .thenReturn(Mono.just(constructConsulResponse()))
+ on("waiting for configuration") {
+ val waitTime = Duration.ofMillis(100)
+ it("should not get it") {
+ StepVerifier.create(configProvider().take(1))
+ .expectNoEvent(waitTime)
+ }
+ }
+
+ }
+ given("valid configuration from cbs") {
+ val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(validConfiguration))
it("should use received configuration") {
- StepVerifier.create(consulConfigProvider().take(1))
+ StepVerifier.create(configProvider().take(1))
.consumeNextWith {
val route1 = it.routing.routes[0]
@@ -85,22 +99,19 @@ internal object ConsulConfigurationProviderTest : Spek({
}
}
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
-
+ given("invalid configuration from cbs") {
val iterationCount = 3L
- val consulConfigProvider = constructConsulConfigProvider(
- invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ val configProvider = constructConfigurationProvider(
+ cbsClientMock, healthStateProvider, iterationCount
)
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
- .thenReturn(Mono.error(RuntimeException("Test exception")))
+ on("new configuration") {
+ whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+ .thenReturn(Flux.just(invalidConfiguration))
it("should interrupt the flux") {
-
- StepVerifier.create(consulConfigProvider())
- .verifyErrorMessage("Test exception")
+ StepVerifier.create(configProvider())
+ .verifyError()
}
it("should update the health state") {
@@ -115,28 +126,9 @@ internal object ConsulConfigurationProviderTest : Spek({
})
-private fun constructConsulConfigProvider(url: String,
- httpAdapter: HttpAdapter,
- healthState: HealthState,
- iterationCount: Long = 1
-): ConsulConfigurationProvider {
-
- val firstRequestDelay = Duration.ofMillis(1)
- val requestInterval = Duration.ofMillis(1)
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
- return ConsulConfigurationProvider(
- httpAdapter,
- url,
- firstRequestDelay,
- requestInterval,
- healthState,
- retry
- )
-}
-
-fun constructConsulResponse(): String =
- """{
+private val validConfiguration = JsonParser().parse("""
+{
"whatever": "garbage",
"collector.routing": [
{
@@ -148,4 +140,34 @@ fun constructConsulResponse(): String =
"toTopic": "test-topic-2"
}
]
- }"""
+}""").asJsonObject
+
+private val invalidConfiguration = JsonParser().parse("""
+{
+ "whatever": "garbage",
+ "collector.routing": [
+ {
+ "fromDomain": "garbage",
+ "meaningful": "garbage"
+ }
+ ]
+}""").asJsonObject
+
+private val firstRequestDelay = Duration.ofMillis(1)
+private val requestInterval = Duration.ofMillis(1)
+
+private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+ healthState: HealthState,
+ iterationCount: Long = 1
+): ConfigurationProviderImpl {
+
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConfigurationProviderImpl(
+ cbsClientMono,
+ firstRequestDelay,
+ requestInterval,
+ healthState,
+ retry
+ )
+}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
new file mode 100644
index 00000000..63caaf0a
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializerTest.kt
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class ProtobufSerializerTest : Spek({
+
+ describe("ProtobufSerializerTest") {
+ val serializer = ProtobufSerializer()
+
+ on("serialize") {
+ it("should return byte array from WTP Frame paylaod") {
+ val header = getDefaultInstance()
+ val payload = header.toByteArray()
+ val msg: MessageLite = mock()
+
+ serializer.serialize("", msg)
+
+ verify(msg).toByteArray()
+ }
+ }
+
+ on("configuring") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.configure(hashMapOf<String, String>(), false)
+ }
+ }
+
+ on("closing") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.close()
+ }
+ }
+ }
+
+
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
new file mode 100644
index 00000000..3a194b47
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+
+class VesMessageSerializerTest : Spek({
+
+ describe("VesMessageSerializer") {
+ val serializer = VesMessageSerializer()
+
+ on("serialize") {
+ it("should return byte array from WTP Frame paylaod") {
+ val header = getDefaultInstance()
+ val payload = header.toByteArray()
+ val msg = VesMessage(header, WireFrameMessage(payload))
+
+ val serialized = serializer.serialize("", msg)
+
+ assertThat(serialized).isEqualTo(payload)
+ }
+ }
+
+ on("configuring") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.configure(hashMapOf<String, String>(), false)
+ }
+ }
+
+ on("closing") {
+ it("should do nothing") {
+ // increase code coverage
+ serializer.close()
+ }
+ }
+ }
+
+
+
+}) \ No newline at end of file