diff options
Diffstat (limited to 'sources/hv-collector-core/src/main')
6 files changed, 134 insertions, 159 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 535d1baa..633095dc 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -53,18 +54,19 @@ class CollectorFactory(val configuration: ConfigurationProvider, val config: AtomicReference<CollectorConfiguration> = AtomicReference() configuration() .doOnNext { - logger.info { "Using updated configuration for new connections" } + logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } healthState.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error { "Failed to acquire configuration from consul" } + logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } + logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } .subscribe(config::set) return object : CollectorProvider { override fun invoke(ctx: ClientContext): Option<Collector> = - config.getOption().map { createVesHvCollector(it, ctx) } + config.getOption().map { createVesHvCollector(it, ctx) } override fun close() = sinkProvider.close() } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 75b6f0a6..312d6d7b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,8 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.KafkaConfiguration -import reactor.netty.http.client.HttpClient +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -38,8 +39,8 @@ object AdapterFactory { else KafkaSinkProvider(kafkaConfig) - fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) - - private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) + fun configurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = + ConfigurationProviderImpl( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + configurationProviderParams) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt new file mode 100644 index 00000000..736f474a --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.google.gson.JsonObject +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.retry.Jitter +import reactor.retry.Retry +import java.time.Duration + + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>, + private val firstRequestDelay: Duration, + private val requestInterval: Duration, + private val healthState: HealthState, + retrySpec: Retry<Any> + +) : ConfigurationProvider { + constructor(cbsClientMono: Mono<CbsClient>, params: ConfigurationProviderParams) : this( + cbsClientMono, + params.firstRequestDelay, + params.requestInterval, + HealthState.INSTANCE, + Retry.any<Any>() + .retryMax(MAX_RETRIES) + .fixedBackoff(params.requestInterval) + .jitter(Jitter.random()) + ) + + private val retry = retrySpec.doOnRetry { + logger.withWarn(ServiceContext::mdc) { + log("Exception from configuration provider client, retrying subscription", it.exception()) + } + healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) + } + + override fun invoke(): Flux<CollectorConfiguration> = + cbsClientMono + .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } } + .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" } + .retryWhen(retry) + .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } } + .flatMapMany(::handleUpdates) + + private fun handleUpdates(cbsClient: CbsClient): Flux<CollectorConfiguration> = cbsClient + .updates(RequestDiagnosticContext.create(), + firstRequestDelay, + requestInterval) + .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } } + .map(::createCollectorConfiguration) + .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" } + .retryWhen(retry) + + + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + try { + val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY) + CollectorConfiguration( + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject + defineRoute { + fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() + ) + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } + + private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString + + + companion object { + private const val ROUTING_CONFIGURATION_KEY = "collector.routing" + private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" + private const val TOPIC_CONFIGURATION_KEY = "toTopic" + + private const val MAX_RETRIES = 5L + private val logger = Logger(ConfigurationProviderImpl::class) + } +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt deleted file mode 100644 index d58cc792..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.io.StringReader -import java.security.MessageDigest -import java.time.Duration -import java.util.* -import java.util.concurrent.atomic.AtomicReference -import javax.json.Json -import javax.json.JsonObject - - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal class ConsulConfigurationProvider(private val http: HttpAdapter, - private val url: String, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - retrySpec: Retry<Any> - -) : ConfigurationProvider { - private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf()) - private val retry = retrySpec.doOnRetry { - logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION) - } - - constructor(http: HttpAdapter, - params: ConfigurationProviderParams) : this( - http, - params.configurationUrl, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random()) - ) - - override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(firstRequestDelay, requestInterval) - .concatMap { askForConfig() } - .flatMap(::filterDifferentValues) - .map(::parseJsonResponse) - .map(::createCollectorConfiguration) - .retryWhen(retry) - - private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer { - val invocationId = UUID.randomUUID() - http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) } - } - - private fun filterDifferentValues(configuration: BodyWithInvocationId) = - configuration.body.let { configurationString -> - configurationString.sha256().let { newHash -> - if (newHash contentEquals lastConfigurationHash.get()) { - logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "No change detected in consul configuration" - } - Mono.empty() - } else { - logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "Obtained new configuration from consul:\n$configurationString" - } - lastConfigurationHash.set(newHash) - Mono.just(configurationString) - } - } - } - - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readObject() - - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = - try { - val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY) - CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY)) - toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY)) - withFixedPartitioning() - } - } - }.build() - ) - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse consul configuration", e) - } - - - companion object { - private const val ROUTING_CONFIGURATION_KEY = "collector.routing" - private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" - private const val TOPIC_CONFIGURATION_KEY = "toTopic" - - private const val MAX_RETRIES = 5L - private const val BACKOFF_INTERVAL_FACTOR = 30L - private val logger = Logger(ConsulConfigurationProvider::class) - private fun String.sha256() = - MessageDigest - .getInstance("SHA-256") - .digest(toByteArray()) - - } - - private data class BodyWithInvocationId(val body: String, val invocationId: UUID) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt index 91f502e6..a1e5b8fd 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -78,4 +78,4 @@ internal fun populateClientContextFromInbound(clientContext: ClientContext, nett withConnectionFrom(nettyInbound) { connection -> clientContext.clientAddress = Try { connection.address().address }.toOption() clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } - }
\ No newline at end of file + } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt index 9de34498..ac7a9db0 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,5 @@ import java.time.Duration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since July 2018 */ -data class ConfigurationProviderParams(val configurationUrl: String, - val firstRequestDelay: Duration, +data class ConfigurationProviderParams(val firstRequestDelay: Duration, val requestInterval: Duration) |