From d632aef8303701a1802f817c3d6fdcd4064c32b2 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 29 Nov 2018 11:58:40 +0100 Subject: Harmonize logging and add new logs - corrected docker-compose consul url Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54 Issue-ID: DCAEGEN2-1003 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/impl/MessageValidator.kt | 26 +++++++++++-- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 5 +-- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 45 ++++++++++++++-------- .../impl/adapters/ConsulConfigurationProvider.kt | 3 +- .../veshv/impl/adapters/kafka/KafkaSink.kt | 19 ++++----- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 6 ++- .../onap/dcae/collectors/veshv/model/VesMessage.kt | 5 +-- .../onap/dcae/collectors/veshv/model/routing.kt | 11 +++++- 8 files changed, 82 insertions(+), 38 deletions(-) (limited to 'sources/hv-collector-core/src/main') diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index fb949079..93940752 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -19,20 +19,38 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader +typealias ValidationFailMessage = () -> String +typealias ValidationSuccessMessage = () -> String +typealias ValidationResult = Either + internal object MessageValidator { - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } + fun validateFrameMessage(message: WireFrameMessage): ValidationResult = + message.validate().fold({ + Either.left { "Invalid wire frame header, reason: ${it.message}" } + }, { + Either.right { "Wire frame header is valid" } + }) + + fun validateProtobufMessage(message: VesMessage): ValidationResult = + if (message.isValid()) { + Either.right { "Protocol buffers message is valid" } + } else { + Either.left { "Unsupported protocol buffers message." } + } + + fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header) + .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = headerRequiredFieldDescriptors .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 1d43588f..c670e1d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try -import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -31,9 +30,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Option = + fun decode(bytes: ByteData): Try = Try { val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader VesMessage(decodedHeader, bytes) - }.toOption() + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2f12e0cd..4176de99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,17 +19,20 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,9 +52,9 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } @@ -63,26 +66,38 @@ internal class VesHvCollector( .concatMap(decoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux): Flux = flux + private fun filterInvalidWireFrame(flux: Flux): Flux = flux + .filterFailedWithLog(MessageValidator::validateFrameMessage) + + private fun decodeProtobufPayload(flux: Flux): Flux = flux .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } + .flatMap(::decodePayload) + + private fun decodePayload(rawPayload: ByteData): Flux = protobufDecoder + .decode(rawPayload) + .filterFailedWithLog(logger, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + + private fun filterInvalidProtobufMessages(flux: Flux): Flux = flux + .filterFailedWithLog(MessageValidator::validateProtobufMessage) private fun routeMessage(flux: Flux): Flux = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - - private fun findRoute(msg: VesMessage): Mono = omitWhenNone((router::findDestination)(msg)) - - private fun omitWhenNone(it: Option): Mono = it.fold( - { - logger.info("ommiting the message" + 5) - Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage) = router + .findDestination(msg) + .filterEmptyWithLog(logger, + { "Found route for message: ${it.topic}, partition: ${it.partition}" }, + { "Could not find route for message" }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + .also { logger.debug("Released buffer memory after handling message stream") } + + fun Flux.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + filterFailedWithLog(logger, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index ec7c60c0..cea8a7ee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -82,8 +82,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private fun filterDifferentValues(configurationString: String) = hashOf(configurationString).let { if (it == lastConfigurationHash.get()) { + logger.trace { "No change detected in consul configuration" } Mono.empty() } else { + logger.info { "Obtained new configuration from consul:\n${configurationString}" } lastConfigurationHash.set(it) Mono.just(configurationString) } @@ -95,7 +97,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } val routing = configuration.getJsonArray("collector.routing") return CollectorConfiguration( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index a0c22418..c4d6c87e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult @@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender): Flux { val records = messages.map(this::vesToKafkaRecord) val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) + .doOnNext { + if (it.isSuccessful()) { + Mono.just(it) + } else { + logger.warn(it.exception()) { "Failed to send message to Kafka" } + Mono.empty>() + } + } .map { it.correlationMetadata() } return if (logger.traceEnabled) { @@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - private fun logSentMessage(sentMsg: RoutedMessage) { logger.trace { val msgNum = sentMessages.incrementAndGet() @@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender) = senderResult.exception() == null + private fun SenderResult.isSuccessful() = exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index e535300a..0b2997fa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -57,8 +57,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> + logger.info("Collector configured with SSL enabled") this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } + }.getOrElse { + logger.info("Collector configured with SSL disabled") + this + } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = collectorProvider().fold( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index f5bfcce1..1965d78c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -20,13 +20,10 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bab95c57..437614ac 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,12 +20,21 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List) { fun routeFor(commonHeader: CommonEventHeader): Option = - Option.fromNullable(routes.find { it.applies(commonHeader) }) + Option.fromNullable(routes.find { it.applies(commonHeader) }).also { + if (it.isEmpty()) { + logger.debug { "No route is defined for domain: ${commonHeader.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { -- cgit 1.2.3-korg