aboutsummaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt48
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt45
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt3
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt5
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt26
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt98
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt67
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt49
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt25
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt35
16 files changed, 456 insertions, 89 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index cf73aed8..ca1605e6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -74,7 +74,7 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger, clientContext::asMap,
+ .filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
@@ -88,7 +88,7 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger, clientContext::asMap,
+ .filterEmptyWithLog(logger, clientContext::fullMdc,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
@@ -96,7 +96,7 @@ internal class VesHvCollector(
.also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, clientContext::asMap, predicate)
+ filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
index 21b79bbe..954de978 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
@@ -27,21 +27,21 @@ import reactor.core.publisher.Flux
@Suppress("TooManyFunctions")
internal object ClientContextLogging {
- fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
- fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
- fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
- fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
- fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
- fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
- fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
- fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
- fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
- fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+ return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index bbaa47c4..14d511be 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
+import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
@@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+ logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
@@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
.map(::createCollectorConfiguration)
.retryWhen(retry)
- private fun askForConfig(): Mono<String> = http.get(url)
+ private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+ val invocationId = UUID.randomUUID()
+ http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+ }
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- logger.trace { "No change detected in consul configuration" }
- Mono.empty()
- } else {
- logger.info { "Obtained new configuration from consul:\n${configurationString}" }
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
+ private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+ configuration.body.let { configurationString ->
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "No change detected in consul configuration"
+ }
+ Mono.empty()
+ } else {
+ logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "Obtained new configuration from consul:\n${configurationString}"
+ }
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
}
}
@@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
-}
+ private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 3fefc6e8..51f7410b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
- logger.withDebug { log("Nested exception:", it) }
- }
+ open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> =
+ httpClient
+ .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() }
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
+ }
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
companion object {
-
-
private val logger = Logger(HttpAdapter::class)
+ const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}"
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 690a7d1e..b4f9a90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::asMap, Marker.INVOKE) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 6f02d43e..d8d786be 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.None
+import arrow.core.Option
import arrow.core.getOrElse
+import arrow.core.toOption
import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,6 +45,10 @@ import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
- clientContext.clientAddress = it.address()
+ populateClientContext(clientContext, it)
+ it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+ sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ }
+
}
- logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info(clientContext::asMap) { "Handling new connection" }
+ logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
@@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
)
}
+ private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+ clientContext.clientAddress = try {
+ Option.fromNullable(connection.address().address)
+ } catch (ex: Exception) {
+ None
+ }
+ clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+ }
+
+ private fun getSslSession(connection: Connection) = Option.fromNullable(
+ connection
+ .channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session)
+
+ private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+ sslSession
+ .peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
@@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
- logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
@@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
// TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
}
return this
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
index 305e4cb1..7b082e64 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
@@ -19,10 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.security.cert.X509Certificate
import java.util.*
/**
@@ -31,13 +34,28 @@ import java.util.*
*/
data class ClientContext(
val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
- val clientId: String = UUID.randomUUID().toString(),
- var clientAddress: InetSocketAddress? = null) {
- fun asMap(): Map<String, String> {
- val result = mutableMapOf("clientId" to clientId)
- if (clientAddress != null) {
- result["clientAddress"] = clientAddress.toString()
- }
- return result
+ var clientAddress: Option<InetAddress> = None,
+ var clientCert: Option<X509Certificate> = None,
+ val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP
+ val invocationId: String = UUID.randomUUID().toString()) {
+
+ val mdc: Map<String, String>
+ get() = mapOf(
+ OnapMdc.REQUEST_ID to requestId,
+ OnapMdc.INVOCATION_ID to invocationId,
+ OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE,
+ OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE },
+ OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE }
+ )
+
+ val fullMdc: Map<String, String>
+ get() = mdc + ServiceContext.mdc
+
+ private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() }
+ private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress)
+
+ companion object {
+ const val DEFAULT_STATUS_CODE = "INPROGRESS"
+ const val DEFAULT_VALUE = ""
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
new file mode 100644
index 00000000..2407eced
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object ServiceContext {
+ val instanceId = UUID.randomUUID().toString()
+ val serverFqdn = getHost().hostName
+
+ val mdc = mapOf(
+ OnapMdc.INSTANCE_ID to instanceId,
+ OnapMdc.SERVER_FQDN to serverFqdn
+ )
+
+ private fun getHost() = try {
+ InetAddress.getLocalHost()
+ } catch (ex: UnknownHostException) {
+ InetAddress.getLoopbackAddress()
+ }
+}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index cdee92c9..605e7a6e 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -19,9 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
import arrow.core.Try
-import arrow.core.success
import com.google.protobuf.ByteString
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
@@ -32,7 +30,6 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import reactor.test.test
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index c6364f74..9ce0c3db 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
@@ -57,7 +58,7 @@ internal object ConsulConfigurationProviderTest : Spek({
val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
on("call to consul") {
- whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
.thenReturn(Mono.just(constructConsulResponse()))
it("should use received configuration") {
@@ -97,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
)
on("call to consul") {
- whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
.thenReturn(Mono.error(RuntimeException("Test exception")))
it("should interrupt the flux") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
index 91457faf..f55b1fdf 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -23,11 +23,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.impl.adapters.HttpAdapter.Companion.INVOCATION_ID_HEADER
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.http.server.HttpServer
import reactor.test.StepVerifier
import reactor.test.test
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -42,6 +44,9 @@ internal object HttpAdapterTest : Spek({
routes.get("/url") { req, resp ->
resp.sendString(Mono.just(req.uri()))
}
+ routes.get("/inv-id") { req, resp ->
+ resp.sendString(Mono.just(req.requestHeaders()[INVOCATION_ID_HEADER]))
+ }
}
.bindNow()
val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
@@ -53,31 +58,46 @@ internal object HttpAdapterTest : Spek({
given("url without query params") {
val url = "/url"
+ val invocationId = UUID.randomUUID()
it("should not append query string") {
- httpAdapter.get(url).test()
+ httpAdapter.get(url, invocationId).test()
.expectNext(url)
.verifyComplete()
}
+
+ it("should pass invocation id") {
+ httpAdapter.get("/inv-id", invocationId).test()
+ .expectNext(invocationId.toString())
+ .verifyComplete()
+ }
}
given("url with query params") {
val queryParams = mapOf(Pair("p", "the-value"))
val url = "/url"
+ val invocationId = UUID.randomUUID()
it("should add them as query string to the url") {
- httpAdapter.get(url, queryParams).test()
+ httpAdapter.get(url, invocationId, queryParams).test()
.expectNext("/url?p=the-value")
.verifyComplete()
}
+
+ it("should pass invocation id") {
+ httpAdapter.get("/inv-id", invocationId, queryParams).test()
+ .expectNext(invocationId.toString())
+ .verifyComplete()
+ }
}
given("invalid url") {
val invalidUrl = "/wtf"
+ val invocationId = UUID.randomUUID()
it("should interrupt the flux") {
StepVerifier
- .create(httpAdapter.get(invalidUrl))
+ .create(httpAdapter.get(invalidUrl, invocationId))
.verifyError()
}
}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt
new file mode 100644
index 00000000..a49428a7
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.security.cert.X509Certificate
+import java.util.*
+import javax.security.auth.x500.X500Principal
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ClientContextTest : Spek({
+ describe("ClientContext") {
+ given("default instance") {
+ val cut = ClientContext()
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.REQUEST_ID}") {
+ assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId)
+ }
+
+ it("should contain ${OnapMdc.INVOCATION_ID}") {
+ assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId)
+ }
+
+ it("should contain ${OnapMdc.STATUS_CODE}") {
+ assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS")
+ }
+
+ it("should contain ${OnapMdc.CLIENT_NAME}") {
+ assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank()
+ }
+
+ it("should contain ${OnapMdc.CLIENT_IP}") {
+ assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank()
+ }
+ }
+ }
+
+ given("instance with client data") {
+ val clientDn = "C=PL, O=Nokia, CN=NokiaBTS"
+ val clientIp = "192.168.52.34"
+ val cert: X509Certificate = mock()
+ val principal: X500Principal = mock()
+ val cut = ClientContext(
+ clientAddress = Some(InetAddress.getByName(clientIp)),
+ clientCert = Some(cert))
+
+ whenever(cert.subjectX500Principal).thenReturn(principal)
+ whenever(principal.toString()).thenReturn(clientDn)
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.CLIENT_NAME}") {
+ assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn)
+ }
+
+ it("should contain ${OnapMdc.CLIENT_IP}") {
+ assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp)
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt
new file mode 100644
index 00000000..5b6e4526
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ServiceContextTest : Spek({
+ describe("ServiceContext") {
+ given("singleton instance") {
+ val cut = ServiceContext
+
+ on("instanceId") {
+ val instanceId = cut.instanceId
+ it("should be valid UUID") {
+ UUID.fromString(instanceId) // should not throw
+ }
+ }
+
+ on("serverFqdn") {
+ val serverFqdn = cut.serverFqdn
+ it("should be non empty") {
+ assertThat(serverFqdn).isNotBlank()
+ }
+ }
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.INSTANCE_ID}") {
+ assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId)
+ }
+
+ it("should contain ${OnapMdc.SERVER_FQDN}") {
+ assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn)
+ }
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index 1e5c9c55..938ba793 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -127,6 +127,7 @@ abstract class AtLevelLogger {
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
abstract fun log(marker: Marker, message: String)
+
open val enabled: Boolean
get() = true
@@ -140,6 +141,19 @@ abstract class AtLevelLogger {
}
}
}
+
+ protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) {
+ if (mdc.isEmpty()) {
+ block()
+ } else {
+ try {
+ mdc.forEach(MDC::put)
+ block()
+ } finally {
+ mdc.keys.forEach(MDC::remove)
+ }
+ }
+ }
}
object OffLevelLogger : AtLevelLogger() {
@@ -168,9 +182,10 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
logger.error(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.error(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.error(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -183,9 +198,10 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
logger.warn(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.warn(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.warn(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -198,9 +214,10 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
logger.info(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.info(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.info(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -213,9 +230,10 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
logger.debug(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.debug(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.debug(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
@@ -228,7 +246,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
logger.trace(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.trace(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.trace(marker.slf4jMarker, message)
+ }
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
index 83fb9a5e..9023528e 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
@@ -20,11 +20,26 @@
package org.onap.dcae.collectors.veshv.utils.logging
import org.slf4j.MarkerFactory
+import java.time.Instant
+import java.util.*
-enum class Marker(private val marker: org.slf4j.Marker) {
- ENTRY(MarkerFactory.getMarker("ENTRY")),
- EXIT(MarkerFactory.getMarker("EXIT")),
- INVOKE(MarkerFactory.getMarker("INVOKE"));
+sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<String, String> = emptyMap()) {
- operator fun invoke() = marker
+ object Entry : Marker(ENTRY)
+ object Exit : Marker(EXIT)
+
+ class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) {
+ companion object {
+ private fun mdc(id: UUID, timestamp: Instant) = mapOf(
+ OnapMdc.INVOCATION_ID to id.toString(),
+ OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString()
+ )
+ }
+ }
+
+ companion object {
+ private val ENTRY = MarkerFactory.getMarker("ENTRY")
+ private val EXIT = MarkerFactory.getMarker("EXIT")
+ private val INVOKE = MarkerFactory.getMarker("INVOKE")
+ }
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt
new file mode 100644
index 00000000..86584164
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object OnapMdc {
+ const val REQUEST_ID = "RequestID"
+ const val CLIENT_NAME = "PartnerName"
+ const val CLIENT_IP = "ClientIPAddress"
+ const val INVOCATION_ID = "InvocationID"
+ const val INVOCATION_TIMESTAMP = "InvokeTimestamp"
+ const val STATUS_CODE = "StatusCode"
+ const val INSTANCE_ID = "InstanceID"
+ const val SERVER_FQDN = "ServerFQDN"
+}