diff options
Diffstat (limited to 'sources')
2 files changed, 49 insertions, 22 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 717da092..87399caf 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -54,11 +54,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, ) : ConfigurationProvider { private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf()) - private val retry = retrySpec - .doOnRetry { - logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) } - healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - } + private val retry = retrySpec.doOnRetry { + logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) } + healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + } constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this( @@ -96,7 +95,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Mono.empty() } else { logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { - "Obtained new configuration from consul:\n${configurationString}" + "Obtained new configuration from consul:\n$configurationString" } lastConfigurationHash.set(newHash) Mono.just(configurationString) @@ -107,28 +106,34 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private fun parseJsonResponse(responseString: String): JsonObject = Json.createReader(StringReader(responseString)).readObject() - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - val routingArray = configuration.getJsonArray("collector.routing") + private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration = + try { + val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY) + CollectorConfiguration( + routing { + for (route in routingArray) { + val routeObj = route.asJsonObject() + defineRoute { + fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY)) + toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY)) + withFixedPartitioning() + } + } + }.build() + ) + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse consul configuration", e) + } - return CollectorConfiguration( - routing { - for (route in routingArray) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString("fromDomain")) - toTopic(routeObj.getString("toTopic")) - withFixedPartitioning() - } - } - }.build() - ) - } companion object { + private const val ROUTING_CONFIGURATION_KEY = "collector.routing" + private const val DOMAIN_CONFIGURATION_KEY = "fromDomain" + private const val TOPIC_CONFIGURATION_KEY = "toTopic" + private const val MAX_RETRIES = 5L private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) - private fun String.sha256() = MessageDigest .getInstance("SHA-256") diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt new file mode 100644 index 00000000..2b123fc8 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt @@ -0,0 +1,22 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +class ParsingException(message: String, cause: Throwable) : Exception(message, cause) |