diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2018-11-29 11:58:40 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2018-12-04 13:31:17 +0100 |
commit | d632aef8303701a1802f817c3d6fdcd4064c32b2 (patch) | |
tree | 70614ef073f437810beea848c9f9a81189b794d8 /sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors | |
parent | dde383a2aa75f94c26d7949665b79cc95486a223 (diff) |
Harmonize logging and add new logs
- corrected docker-compose consul url
Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54
Issue-ID: DCAEGEN2-1003
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors')
8 files changed, 82 insertions, 38 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index fb949079..93940752 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -19,20 +19,38 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader +typealias ValidationFailMessage = () -> String +typealias ValidationSuccessMessage = () -> String +typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage> + internal object MessageValidator { - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } + fun validateFrameMessage(message: WireFrameMessage): ValidationResult = + message.validate().fold({ + Either.left { "Invalid wire frame header, reason: ${it.message}" } + }, { + Either.right { "Wire frame header is valid" } + }) + + fun validateProtobufMessage(message: VesMessage): ValidationResult = + if (message.isValid()) { + Either.right { "Protocol buffers message is valid" } + } else { + Either.left { "Unsupported protocol buffers message." } + } + + fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header) + .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = headerRequiredFieldDescriptors .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 1d43588f..c670e1d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try -import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -31,9 +30,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Option<VesMessage> = + fun decode(bytes: ByteData): Try<VesMessage> = Try { val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader VesMessage(decodedHeader, bytes) - }.toOption() + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2f12e0cd..4176de99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,17 +19,20 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,9 +52,9 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } @@ -63,26 +66,38 @@ internal class VesHvCollector( .concatMap(decoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux + .filterFailedWithLog(MessageValidator::validateFrameMessage) + + private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } + .flatMap(::decodePayload) + + private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder + .decode(rawPayload) + .filterFailedWithLog(logger, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + + private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux + .filterFailedWithLog(MessageValidator::validateProtobufMessage) private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { - logger.info("ommiting the message" + 5) - Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage) = router + .findDestination(msg) + .filterEmptyWithLog(logger, + { "Found route for message: ${it.topic}, partition: ${it.partition}" }, + { "Could not find route for message" }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + .also { logger.debug("Released buffer memory after handling message stream") } + + fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + filterFailedWithLog(logger, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index ec7c60c0..cea8a7ee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -82,8 +82,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private fun filterDifferentValues(configurationString: String) = hashOf(configurationString).let { if (it == lastConfigurationHash.get()) { + logger.trace { "No change detected in consul configuration" } Mono.empty() } else { + logger.info { "Obtained new configuration from consul:\n${configurationString}" } lastConfigurationHash.set(it) Mono.just(configurationString) } @@ -95,7 +97,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } val routing = configuration.getJsonArray("collector.routing") return CollectorConfiguration( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index a0c22418..c4d6c87e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult @@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) + .doOnNext { + if (it.isSuccessful()) { + Mono.just(it) + } else { + logger.warn(it.exception()) { "Failed to send message to Kafka" } + Mono.empty<SenderResult<RoutedMessage>>() + } + } .map { it.correlationMetadata() } return if (logger.traceEnabled) { @@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM msg) } - private fun logException(senderResult: SenderResult<out Any>) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - private fun logSentMessage(sentMsg: RoutedMessage) { logger.trace { val msgNum = sentMessages.incrementAndGet() @@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } } - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + private fun SenderResult<out Any>.isSuccessful() = exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index e535300a..0b2997fa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -57,8 +57,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> + logger.info("Collector configured with SSL enabled") this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } + }.getOrElse { + logger.info("Collector configured with SSL disabled") + this + } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index f5bfcce1..1965d78c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -20,13 +20,10 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bab95c57..437614ac 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,12 +20,21 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) + Option.fromNullable(routes.find { it.applies(commonHeader) }).also { + if (it.isEmpty()) { + logger.debug { "No route is defined for domain: ${commonHeader.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { |