aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-11-29 11:58:40 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2018-12-04 13:31:17 +0100
commitd632aef8303701a1802f817c3d6fdcd4064c32b2 (patch)
tree70614ef073f437810beea848c9f9a81189b794d8 /sources/hv-collector-core/src/main/kotlin
parentdde383a2aa75f94c26d7949665b79cc95486a223 (diff)
Harmonize logging and add new logs
- corrected docker-compose consul url Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54 Issue-ID: DCAEGEN2-1003 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt19
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt11
8 files changed, 82 insertions, 38 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index fb949079..93940752 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -19,20 +19,38 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+typealias ValidationFailMessage = () -> String
+typealias ValidationSuccessMessage = () -> String
+typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage>
+
internal object MessageValidator {
- fun isValid(message: VesMessage): Boolean {
- return allMandatoryFieldsArePresent(message.header)
- }
+ fun validateFrameMessage(message: WireFrameMessage): ValidationResult =
+ message.validate().fold({
+ Either.left { "Invalid wire frame header, reason: ${it.message}" }
+ }, {
+ Either.right { "Wire frame header is valid" }
+ })
+
+ fun validateProtobufMessage(message: VesMessage): ValidationResult =
+ if (message.isValid()) {
+ Either.right { "Protocol buffers message is valid" }
+ } else {
+ Either.left { "Unsupported protocol buffers message." }
+ }
+
+ fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header)
+ .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
headerRequiredFieldDescriptors
.all { fieldDescriptor -> header.hasField(fieldDescriptor) }
- .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index 1d43588f..c670e1d8 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Try
-import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.VesEvent
@@ -31,9 +30,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): Option<VesMessage> =
+ fun decode(bytes: ByteData): Try<VesMessage> =
Try {
val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
VesMessage(decodedHeader, bytes)
- }.toOption()
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 2f12e0cd..4176de99 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,17 +19,20 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.Either
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -49,9 +52,9 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(WireFrameMessage::isValid)
- .transform(::decodePayload)
- .filter(VesMessage::isValid)
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
.transform(::routeMessage)
.onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
@@ -63,26 +66,38 @@ internal class VesHvCollector(
.concatMap(decoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateFrameMessage)
+
+ private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
.map(WireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .flatMap { omitWhenNone(it) }
+ .flatMap(::decodePayload)
+
+ private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
+ .decode(rawPayload)
+ .filterFailedWithLog(logger,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+
+ private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateProtobufMessage)
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
-
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
- private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
- {
- logger.info("ommiting the message" + 5)
- Mono.empty() },
- { Mono.just(it) })
+ private fun findRoute(msg: VesMessage) = router
+ .findDestination(msg)
+ .filterEmptyWithLog(logger,
+ { "Found route for message: ${it.topic}, partition: ${it.partition}" },
+ { "Could not find route for message" })
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+ .also { logger.debug("Released buffer memory after handling message stream") }
+
+ fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+ filterFailedWithLog(logger, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index ec7c60c0..cea8a7ee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -82,8 +82,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private fun filterDifferentValues(configurationString: String) =
hashOf(configurationString).let {
if (it == lastConfigurationHash.get()) {
+ logger.trace { "No change detected in consul configuration" }
Mono.empty()
} else {
+ logger.info { "Obtained new configuration from consul:\n${configurationString}" }
lastConfigurationHash.set(it)
Mono.just(configurationString)
}
@@ -95,7 +97,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- logger.info { "Obtained new configuration from consul:\n${configuration}" }
val routing = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index a0c22418..c4d6c87e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
@@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
+ .doOnNext {
+ if (it.isSuccessful()) {
+ Mono.just(it)
+ } else {
+ logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ Mono.empty<SenderResult<RoutedMessage>>()
+ }
+ }
.map { it.correlationMetadata() }
return if (logger.traceEnabled) {
@@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
msg)
}
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
private fun logSentMessage(sentMsg: RoutedMessage) {
logger.trace {
val msgNum = sentMessages.incrementAndGet()
@@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun SenderResult<out Any>.isSuccessful() = exception() == null
companion object {
val logger = Logger(KafkaSink::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index e535300a..0b2997fa 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -57,8 +57,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
+ logger.info("Collector configured with SSL enabled")
this.secure { b -> b.sslContext(sslContext) }
- }.getOrElse { this }
+ }.getOrElse {
+ logger.info("Collector configured with SSL disabled")
+ this
+ }
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index f5bfcce1..1965d78c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -20,13 +20,10 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
- fun isValid(): Boolean = MessageValidator.isValid(this)
-}
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bab95c57..437614ac 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -20,12 +20,21 @@
package org.onap.dcae.collectors.veshv.model
import arrow.core.Option
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
+ Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
+ if (it.isEmpty()) {
+ logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
+ }
+ }
+
+ companion object {
+ private val logger = Logger(Routing::class)
+ }
}
data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {