summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt49
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt22
2 files changed, 49 insertions, 22 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 717da092..87399caf 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -54,11 +54,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
) : ConfigurationProvider {
private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
- private val retry = retrySpec
- .doOnRetry {
- logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
- healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- }
+ private val retry = retrySpec.doOnRetry {
+ logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ }
constructor(http: HttpAdapter,
params: ConfigurationProviderParams) : this(
@@ -96,7 +95,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Mono.empty()
} else {
logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
- "Obtained new configuration from consul:\n${configurationString}"
+ "Obtained new configuration from consul:\n$configurationString"
}
lastConfigurationHash.set(newHash)
Mono.just(configurationString)
@@ -107,28 +106,34 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private fun parseJsonResponse(responseString: String): JsonObject =
Json.createReader(StringReader(responseString)).readObject()
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- val routingArray = configuration.getJsonArray("collector.routing")
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
+ try {
+ val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
+ CollectorConfiguration(
+ routing {
+ for (route in routingArray) {
+ val routeObj = route.asJsonObject()
+ defineRoute {
+ fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
+ toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse consul configuration", e)
+ }
- return CollectorConfiguration(
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString("fromDomain"))
- toTopic(routeObj.getString("toTopic"))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- }
companion object {
+ private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
+ private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
+ private const val TOPIC_CONFIGURATION_KEY = "toTopic"
+
private const val MAX_RETRIES = 5L
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
-
private fun String.sha256() =
MessageDigest
.getInstance("SHA-256")
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt
new file mode 100644
index 00000000..2b123fc8
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+class ParsingException(message: String, cause: Throwable) : Exception(message, cause)