summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt13
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt118
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt145
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt5
6 files changed, 134 insertions, 159 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 535d1baa..633095dc 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider,
val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
.doOnNext {
- logger.info { "Using updated configuration for new connections" }
+ logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error { "Failed to acquire configuration from consul" }
+ logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+ logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
return object : CollectorProvider {
override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ config.getOption().map { createVesHvCollector(it, ctx) }
override fun close() = sinkProvider.close()
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 75b6f0a6..312d6d7b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
-import reactor.netty.http.client.HttpClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -38,8 +39,8 @@ object AdapterFactory {
else
KafkaSinkProvider(kafkaConfig)
- fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
- private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+ fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConfigurationProviderImpl(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ configurationProviderParams)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
new file mode 100644
index 00000000..736f474a
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.google.gson.JsonObject
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration,
+ private val healthState: HealthState,
+ retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+ constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this(
+ cbsClientMono,
+ params.firstRequestDelay,
+ params.requestInterval,
+ HealthState.INSTANCE,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval)
+ .jitter(Jitter.random())
+ )
+
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(ServiceContext::mdc) {
+ log("Exception from configuration provider client, retrying subscription", it.exception())
+ }
+ healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ }
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ cbsClientMono
+ .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
+ .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+ .retryWhen(retry)
+ .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+ .flatMapMany(::handleUpdates)
+
+ private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient
+ .updates(RequestDiagnosticContext.create(),
+ firstRequestDelay,
+ requestInterval)
+ .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+ .map(::createCollectorConfiguration)
+ .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+ .retryWhen(retry)
+
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ try {
+ val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
+ CollectorConfiguration(
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject
+ defineRoute {
+ fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+
+ private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+
+
+ companion object {
+ private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
+ private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
+ private const val TOPIC_CONFIGURATION_KEY = "toTopic"
+
+ private const val MAX_RETRIES = 5L
+ private val logger = Logger(ConfigurationProviderImpl::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644
index d58cc792..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.security.MessageDigest
-import java.time.Duration
-import java.util.*
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
- private val url: String,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
- private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
- private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- }
-
- constructor(http: HttpAdapter,
- params: ConfigurationProviderParams) : this(
- http,
- params.configurationUrl,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random())
- )
-
- override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(firstRequestDelay, requestInterval)
- .concatMap { askForConfig() }
- .flatMap(::filterDifferentValues)
- .map(::parseJsonResponse)
- .map(::createCollectorConfiguration)
- .retryWhen(retry)
-
- private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
- val invocationId = UUID.randomUUID()
- http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
- }
-
- private fun filterDifferentValues(configuration: BodyWithInvocationId) =
- configuration.body.let { configurationString ->
- configurationString.sha256().let { newHash ->
- if (newHash contentEquals lastConfigurationHash.get()) {
- logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "No change detected in consul configuration"
- }
- Mono.empty()
- } else {
- logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "Obtained new configuration from consul:\n$configurationString"
- }
- lastConfigurationHash.set(newHash)
- Mono.just(configurationString)
- }
- }
- }
-
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readObject()
-
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
- try {
- val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
- CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse consul configuration", e)
- }
-
-
- companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
- private const val MAX_RETRIES = 5L
- private const val BACKOFF_INTERVAL_FACTOR = 30L
- private val logger = Logger(ConsulConfigurationProvider::class)
- private fun String.sha256() =
- MessageDigest
- .getInstance("SHA-256")
- .digest(toByteArray())
-
- }
-
- private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
index 91f502e6..a1e5b8fd 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
@@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett
withConnectionFrom(nettyInbound) { connection ->
clientContext.clientAddress = Try { connection.address().address }.toOption()
clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
- } \ No newline at end of file
+ }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
index 9de34498..ac7a9db0 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,5 @@ import java.time.Duration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since July 2018
*/
-data class ConfigurationProviderParams(val configurationUrl: String,
- val firstRequestDelay: Duration,
+data class ConfigurationProviderParams(val firstRequestDelay: Duration,
val requestInterval: Duration)