diff options
Diffstat (limited to 'sources')
16 files changed, 456 insertions, 89 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index cf73aed8..ca1605e6 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -74,7 +74,7 @@ internal class VesHvCollector( private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder .decode(rawPayload) - .filterFailedWithLog(logger, clientContext::asMap, + .filterFailedWithLog(logger, clientContext::fullMdc, { "Ves event header decoded successfully" }, { "Failed to decode ves event header, reason: ${it.message}" }) @@ -88,7 +88,7 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage) = router .findDestination(msg) - .filterEmptyWithLog(logger, clientContext::asMap, + .filterEmptyWithLog(logger, clientContext::fullMdc, { "Found route for message: ${it.topic}, partition: ${it.partition}" }, { "Could not find route for message" }) @@ -96,7 +96,7 @@ internal class VesHvCollector( .also { logger.debug { "Released buffer memory after handling message stream" } } fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = - filterFailedWithLog(logger, clientContext::asMap, predicate) + filterFailedWithLog(logger, clientContext::fullMdc, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt index 21b79bbe..954de978 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt @@ -27,21 +27,21 @@ import reactor.core.publisher.Flux @Suppress("TooManyFunctions") internal object ClientContextLogging { - fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block) - fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block) - fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block) - fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block) - fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block) + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block) - fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message) - fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message) - fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message) - fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message) - fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message) + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message) fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux) + return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index bbaa47c4..14d511be 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -24,13 +24,16 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams +import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter import reactor.retry.Retry import java.io.StringReader import java.time.Duration +import java.util.* import java.util.concurrent.atomic.AtomicReference import javax.json.Json import javax.json.JsonObject @@ -52,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) private val retry = retrySpec .doOnRetry { - logger.withWarn { log("Could not get fresh configuration", it.exception()) } + logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) } healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } @@ -77,17 +80,26 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, .map(::createCollectorConfiguration) .retryWhen(retry) - private fun askForConfig(): Mono<String> = http.get(url) + private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer { + val invocationId = UUID.randomUUID() + http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) } + } - private fun filterDifferentValues(configurationString: String) = - hashOf(configurationString).let { - if (it == lastConfigurationHash.get()) { - logger.trace { "No change detected in consul configuration" } - Mono.empty() - } else { - logger.info { "Obtained new configuration from consul:\n${configurationString}" } - lastConfigurationHash.set(it) - Mono.just(configurationString) + private fun filterDifferentValues(configuration: BodyWithInvocationId) = + configuration.body.let { configurationString -> + hashOf(configurationString).let { + if (it == lastConfigurationHash.get()) { + logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { + "No change detected in consul configuration" + } + Mono.empty() + } else { + logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) { + "Obtained new configuration from consul:\n${configurationString}" + } + lastConfigurationHash.set(it) + Mono.just(configurationString) + } } } @@ -119,5 +131,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } -} + private data class BodyWithInvocationId(val body: String, val invocationId: UUID) +} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 3fefc6e8..51f7410b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -21,9 +21,11 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient +import java.util.* /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -31,21 +33,23 @@ import reactor.netty.http.client.HttpClient */ open class HttpAdapter(private val httpClient: HttpClient) { - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get() - .uri(url + createQueryString(queryParams)) - .responseSingle { response, content -> - if (response.status().codeClass() == HttpStatusClass.SUCCESS) - content.asString() - else { - val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" - Mono.error(IllegalStateException(errorMessage)) - } - } - .doOnError { - logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } - logger.withDebug { log("Nested exception:", it) } - } + open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> = + httpClient + .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() } + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else { + val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" + Mono.error(IllegalStateException(errorMessage)) + } + } + .doOnError { + logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" } + logger.withDebug { log("Nested exception:", it) } + } private fun createQueryString(params: Map<String, Any>): String { if (params.isEmpty()) @@ -65,8 +69,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } companion object { - - private val logger = Logger(HttpAdapter::class) + const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 690a7d1e..b4f9a90c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace(ctx::asMap, Marker.INVOKE) { + logger.trace(ctx::fullMdc, Marker.Invoke()) { val msgNum = sentMessages.incrementAndGet() "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 6f02d43e..d8d786be 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,13 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.core.None +import arrow.core.Option import arrow.core.getOrElse +import arrow.core.toOption import arrow.effects.IO +import arrow.syntax.collections.firstOption +import io.netty.handler.ssl.SslHandler import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ClientContext @@ -40,6 +45,10 @@ import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer import java.time.Duration +import java.lang.Exception +import java.security.cert.X509Certificate +import javax.net.ssl.SSLSession + /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { val clientContext = ClientContext(nettyOutbound.alloc()) nettyInbound.withConnection { - clientContext.clientAddress = it.address() + populateClientContext(clientContext, it) + it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> + sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } + } + } - logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" } + logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } return collectorProvider(clientContext).fold( { - logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." } + logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } Mono.empty() }, { - logger.info(clientContext::asMap) { "Handling new connection" } + logger.info(clientContext::fullMdc) { "Handling new connection" } nettyInbound.withConnection { conn -> conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout) .logConnectionClosed(clientContext) @@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, ) } + private fun populateClientContext(clientContext: ClientContext, connection: Connection) { + clientContext.clientAddress = try { + Option.fromNullable(connection.address().address) + } catch (ex: Exception) { + None + } + clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert) + } + + private fun getSslSession(connection: Connection) = Option.fromNullable( + connection + .channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session) + + private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> = + sslSession + .peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() @@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.disconnectClient(ctx: ClientContext) { channel().close().addListener { - logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." } + logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." } if (it.isSuccess) logger.debug(ctx) { "Channel closed successfully." } else @@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun Connection.logConnectionClosed(ctx: ClientContext): Connection { onTerminate().subscribe { // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled) - logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" } + logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } } return this } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt index 305e4cb1..7b082e64 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt @@ -19,10 +19,13 @@ */ package org.onap.dcae.collectors.veshv.model +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.net.InetSocketAddress +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.net.InetAddress +import java.security.cert.X509Certificate import java.util.* /** @@ -31,13 +34,28 @@ import java.util.* */ data class ClientContext( val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, - val clientId: String = UUID.randomUUID().toString(), - var clientAddress: InetSocketAddress? = null) { - fun asMap(): Map<String, String> { - val result = mutableMapOf("clientId" to clientId) - if (clientAddress != null) { - result["clientAddress"] = clientAddress.toString() - } - return result + var clientAddress: Option<InetAddress> = None, + var clientCert: Option<X509Certificate> = None, + val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP + val invocationId: String = UUID.randomUUID().toString()) { + + val mdc: Map<String, String> + get() = mapOf( + OnapMdc.REQUEST_ID to requestId, + OnapMdc.INVOCATION_ID to invocationId, + OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE, + OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE }, + OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE } + ) + + val fullMdc: Map<String, String> + get() = mdc + ServiceContext.mdc + + private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() } + private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress) + + companion object { + const val DEFAULT_STATUS_CODE = "INPROGRESS" + const val DEFAULT_VALUE = "" } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt new file mode 100644 index 00000000..2407eced --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.net.InetAddress +import java.net.UnknownHostException +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +object ServiceContext { + val instanceId = UUID.randomUUID().toString() + val serverFqdn = getHost().hostName + + val mdc = mapOf( + OnapMdc.INSTANCE_ID to instanceId, + OnapMdc.SERVER_FQDN to serverFqdn + ) + + private fun getHost() = try { + InetAddress.getLocalHost() + } catch (ex: UnknownHostException) { + InetAddress.getLoopbackAddress() + } +} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index cdee92c9..605e7a6e 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -19,9 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option import arrow.core.Try -import arrow.core.success import com.google.protobuf.ByteString import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -32,7 +30,6 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import reactor.test.test import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index c6364f74..9ce0c3db 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever @@ -57,7 +58,7 @@ internal object ConsulConfigurationProviderTest : Spek({ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) on("call to consul") { - whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) + whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap())) .thenReturn(Mono.just(constructConsulResponse())) it("should use received configuration") { @@ -97,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({ ) on("call to consul") { - whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) + whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap())) .thenReturn(Mono.error(RuntimeException("Test exception"))) it("should interrupt the flux") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 91457faf..f55b1fdf 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -23,11 +23,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.impl.adapters.HttpAdapter.Companion.INVOCATION_ID_HEADER import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient import reactor.netty.http.server.HttpServer import reactor.test.StepVerifier import reactor.test.test +import java.util.* /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -42,6 +44,9 @@ internal object HttpAdapterTest : Spek({ routes.get("/url") { req, resp -> resp.sendString(Mono.just(req.uri())) } + routes.get("/inv-id") { req, resp -> + resp.sendString(Mono.just(req.requestHeaders()[INVOCATION_ID_HEADER])) + } } .bindNow() val baseUrl = "http://${httpServer.host()}:${httpServer.port()}" @@ -53,31 +58,46 @@ internal object HttpAdapterTest : Spek({ given("url without query params") { val url = "/url" + val invocationId = UUID.randomUUID() it("should not append query string") { - httpAdapter.get(url).test() + httpAdapter.get(url, invocationId).test() .expectNext(url) .verifyComplete() } + + it("should pass invocation id") { + httpAdapter.get("/inv-id", invocationId).test() + .expectNext(invocationId.toString()) + .verifyComplete() + } } given("url with query params") { val queryParams = mapOf(Pair("p", "the-value")) val url = "/url" + val invocationId = UUID.randomUUID() it("should add them as query string to the url") { - httpAdapter.get(url, queryParams).test() + httpAdapter.get(url, invocationId, queryParams).test() .expectNext("/url?p=the-value") .verifyComplete() } + + it("should pass invocation id") { + httpAdapter.get("/inv-id", invocationId, queryParams).test() + .expectNext(invocationId.toString()) + .verifyComplete() + } } given("invalid url") { val invalidUrl = "/wtf" + val invocationId = UUID.randomUUID() it("should interrupt the flux") { StepVerifier - .create(httpAdapter.get(invalidUrl)) + .create(httpAdapter.get(invalidUrl, invocationId)) .verifyError() } } diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt new file mode 100644 index 00000000..a49428a7 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.net.Inet4Address +import java.net.InetAddress +import java.net.InetSocketAddress +import java.security.cert.X509Certificate +import java.util.* +import javax.security.auth.x500.X500Principal + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +internal object ClientContextTest : Spek({ + describe("ClientContext") { + given("default instance") { + val cut = ClientContext() + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.REQUEST_ID}") { + assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId) + } + + it("should contain ${OnapMdc.INVOCATION_ID}") { + assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId) + } + + it("should contain ${OnapMdc.STATUS_CODE}") { + assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS") + } + + it("should contain ${OnapMdc.CLIENT_NAME}") { + assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank() + } + + it("should contain ${OnapMdc.CLIENT_IP}") { + assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank() + } + } + } + + given("instance with client data") { + val clientDn = "C=PL, O=Nokia, CN=NokiaBTS" + val clientIp = "192.168.52.34" + val cert: X509Certificate = mock() + val principal: X500Principal = mock() + val cut = ClientContext( + clientAddress = Some(InetAddress.getByName(clientIp)), + clientCert = Some(cert)) + + whenever(cert.subjectX500Principal).thenReturn(principal) + whenever(principal.toString()).thenReturn(clientDn) + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.CLIENT_NAME}") { + assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn) + } + + it("should contain ${OnapMdc.CLIENT_IP}") { + assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp) + } + } + } + } +}) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt new file mode 100644 index 00000000..5b6e4526 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.model + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import java.util.* + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +internal object ServiceContextTest : Spek({ + describe("ServiceContext") { + given("singleton instance") { + val cut = ServiceContext + + on("instanceId") { + val instanceId = cut.instanceId + it("should be valid UUID") { + UUID.fromString(instanceId) // should not throw + } + } + + on("serverFqdn") { + val serverFqdn = cut.serverFqdn + it("should be non empty") { + assertThat(serverFqdn).isNotBlank() + } + } + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.INSTANCE_ID}") { + assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId) + } + + it("should contain ${OnapMdc.SERVER_FQDN}") { + assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn) + } + } + } + } +}) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 1e5c9c55..938ba793 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -127,6 +127,7 @@ abstract class AtLevelLogger { abstract fun log(message: String) abstract fun log(message: String, t: Throwable) abstract fun log(marker: Marker, message: String) + open val enabled: Boolean get() = true @@ -140,6 +141,19 @@ abstract class AtLevelLogger { } } } + + protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) { + if (mdc.isEmpty()) { + block() + } else { + try { + mdc.forEach(MDC::put) + block() + } finally { + mdc.keys.forEach(MDC::remove) + } + } + } } object OffLevelLogger : AtLevelLogger() { @@ -168,9 +182,10 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { logger.error(message, t) } - override fun log(marker: Marker, message: String) { - logger.error(marker(), message) - } + override fun log(marker: Marker, message: String) = + withAdditionalMdc(marker.mdc) { + logger.error(marker.slf4jMarker, message) + } } @Suppress("SuboptimalLoggerUsage") @@ -183,9 +198,10 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { logger.warn(message, t) } - override fun log(marker: Marker, message: String) { - logger.warn(marker(), message) - } + override fun log(marker: Marker, message: String) = + withAdditionalMdc(marker.mdc) { + logger.warn(marker.slf4jMarker, message) + } } @Suppress("SuboptimalLoggerUsage") @@ -198,9 +214,10 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { logger.info(message, t) } - override fun log(marker: Marker, message: String) { - logger.info(marker(), message) - } + override fun log(marker: Marker, message: String) = + withAdditionalMdc(marker.mdc) { + logger.info(marker.slf4jMarker, message) + } } @Suppress("SuboptimalLoggerUsage") @@ -213,9 +230,10 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { logger.debug(message, t) } - override fun log(marker: Marker, message: String) { - logger.debug(marker(), message) - } + override fun log(marker: Marker, message: String) = + withAdditionalMdc(marker.mdc) { + logger.debug(marker.slf4jMarker, message) + } } @Suppress("SuboptimalLoggerUsage") @@ -228,7 +246,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { logger.trace(message, t) } - override fun log(marker: Marker, message: String) { - logger.trace(marker(), message) - } + override fun log(marker: Marker, message: String) = + withAdditionalMdc(marker.mdc) { + logger.trace(marker.slf4jMarker, message) + } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt index 83fb9a5e..9023528e 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt @@ -20,11 +20,26 @@ package org.onap.dcae.collectors.veshv.utils.logging import org.slf4j.MarkerFactory +import java.time.Instant +import java.util.* -enum class Marker(private val marker: org.slf4j.Marker) { - ENTRY(MarkerFactory.getMarker("ENTRY")), - EXIT(MarkerFactory.getMarker("EXIT")), - INVOKE(MarkerFactory.getMarker("INVOKE")); +sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<String, String> = emptyMap()) { - operator fun invoke() = marker + object Entry : Marker(ENTRY) + object Exit : Marker(EXIT) + + class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) { + companion object { + private fun mdc(id: UUID, timestamp: Instant) = mapOf( + OnapMdc.INVOCATION_ID to id.toString(), + OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString() + ) + } + } + + companion object { + private val ENTRY = MarkerFactory.getMarker("ENTRY") + private val EXIT = MarkerFactory.getMarker("EXIT") + private val INVOKE = MarkerFactory.getMarker("INVOKE") + } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt new file mode 100644 index 00000000..86584164 --- /dev/null +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +object OnapMdc { + const val REQUEST_ID = "RequestID" + const val CLIENT_NAME = "PartnerName" + const val CLIENT_IP = "ClientIPAddress" + const val INVOCATION_ID = "InvocationID" + const val INVOCATION_TIMESTAMP = "InvokeTimestamp" + const val STATUS_CODE = "StatusCode" + const val INSTANCE_ID = "InstanceID" + const val SERVER_FQDN = "ServerFQDN" +} |