aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt48
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt45
8 files changed, 176 insertions, 61 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index cf73aed8..ca1605e6 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -74,7 +74,7 @@ internal class VesHvCollector(
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger, clientContext::asMap,
+ .filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
@@ -88,7 +88,7 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger, clientContext::asMap,
+ .filterEmptyWithLog(logger, clientContext::fullMdc,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
@@ -96,7 +96,7 @@ internal class VesHvCollector(
.also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, clientContext::asMap, predicate)
+ filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
index 21b79bbe..954de978 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
@@ -27,21 +27,21 @@ import reactor.core.publisher.Flux
@Suppress("TooManyFunctions")
internal object ClientContextLogging {
- fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
- fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
- fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
- fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
- fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
- fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
- fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
- fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
- fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
- fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+ return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index bbaa47c4..14d511be 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
+import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
@@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+ logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
@@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
.map(::createCollectorConfiguration)
.retryWhen(retry)
- private fun askForConfig(): Mono<String> = http.get(url)
+ private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+ val invocationId = UUID.randomUUID()
+ http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+ }
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- logger.trace { "No change detected in consul configuration" }
- Mono.empty()
- } else {
- logger.info { "Obtained new configuration from consul:\n${configurationString}" }
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
+ private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+ configuration.body.let { configurationString ->
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "No change detected in consul configuration"
+ }
+ Mono.empty()
+ } else {
+ logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "Obtained new configuration from consul:\n${configurationString}"
+ }
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
}
}
@@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
-}
+ private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 3fefc6e8..51f7410b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
- logger.withDebug { log("Nested exception:", it) }
- }
+ open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> =
+ httpClient
+ .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() }
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
+ }
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
companion object {
-
-
private val logger = Logger(HttpAdapter::class)
+ const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}"
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 690a7d1e..b4f9a90c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::asMap, Marker.INVOKE) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 6f02d43e..d8d786be 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.None
+import arrow.core.Option
import arrow.core.getOrElse
+import arrow.core.toOption
import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,6 +45,10 @@ import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
- clientContext.clientAddress = it.address()
+ populateClientContext(clientContext, it)
+ it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+ sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ }
+
}
- logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info(clientContext::asMap) { "Handling new connection" }
+ logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
@@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
)
}
+ private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+ clientContext.clientAddress = try {
+ Option.fromNullable(connection.address().address)
+ } catch (ex: Exception) {
+ None
+ }
+ clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+ }
+
+ private fun getSslSession(connection: Connection) = Option.fromNullable(
+ connection
+ .channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session)
+
+ private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+ sslSession
+ .peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
@@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
- logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
@@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
// TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
}
return this
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
index 305e4cb1..7b082e64 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
@@ -19,10 +19,13 @@
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.security.cert.X509Certificate
import java.util.*
/**
@@ -31,13 +34,28 @@ import java.util.*
*/
data class ClientContext(
val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
- val clientId: String = UUID.randomUUID().toString(),
- var clientAddress: InetSocketAddress? = null) {
- fun asMap(): Map<String, String> {
- val result = mutableMapOf("clientId" to clientId)
- if (clientAddress != null) {
- result["clientAddress"] = clientAddress.toString()
- }
- return result
+ var clientAddress: Option<InetAddress> = None,
+ var clientCert: Option<X509Certificate> = None,
+ val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP
+ val invocationId: String = UUID.randomUUID().toString()) {
+
+ val mdc: Map<String, String>
+ get() = mapOf(
+ OnapMdc.REQUEST_ID to requestId,
+ OnapMdc.INVOCATION_ID to invocationId,
+ OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE,
+ OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE },
+ OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE }
+ )
+
+ val fullMdc: Map<String, String>
+ get() = mdc + ServiceContext.mdc
+
+ private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() }
+ private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress)
+
+ companion object {
+ const val DEFAULT_STATUS_CODE = "INPROGRESS"
+ const val DEFAULT_VALUE = ""
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
new file mode 100644
index 00000000..2407eced
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object ServiceContext {
+ val instanceId = UUID.randomUUID().toString()
+ val serverFqdn = getHost().hostName
+
+ val mdc = mapOf(
+ OnapMdc.INSTANCE_ID to instanceId,
+ OnapMdc.SERVER_FQDN to serverFqdn
+ )
+
+ private fun getHost() = try {
+ InetAddress.getLocalHost()
+ } catch (ex: UnknownHostException) {
+ InetAddress.getLoopbackAddress()
+ }
+}