aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt2
4 files changed, 57 insertions, 41 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
index 21b79bbe..954de978 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
@@ -27,21 +27,21 @@ import reactor.core.publisher.Flux
@Suppress("TooManyFunctions")
internal object ClientContextLogging {
- fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
- fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
- fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
- fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
- fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
- fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
- fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
- fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
- fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
- fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+ return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index bbaa47c4..14d511be 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
+import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
@@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+ logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
@@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
.map(::createCollectorConfiguration)
.retryWhen(retry)
- private fun askForConfig(): Mono<String> = http.get(url)
+ private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+ val invocationId = UUID.randomUUID()
+ http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+ }
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- logger.trace { "No change detected in consul configuration" }
- Mono.empty()
- } else {
- logger.info { "Obtained new configuration from consul:\n${configurationString}" }
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
+ private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+ configuration.body.let { configurationString ->
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "No change detected in consul configuration"
+ }
+ Mono.empty()
+ } else {
+ logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "Obtained new configuration from consul:\n${configurationString}"
+ }
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
}
}
@@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
-}
+ private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 3fefc6e8..51f7410b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
- logger.withDebug { log("Nested exception:", it) }
- }
+ open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> =
+ httpClient
+ .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() }
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
+ }
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
companion object {
-
-
private val logger = Logger(HttpAdapter::class)
+ const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}"
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 690a7d1e..b4f9a90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::asMap, Marker.INVOKE) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}