diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2018-11-29 11:58:40 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2018-12-04 13:31:17 +0100 |
commit | d632aef8303701a1802f817c3d6fdcd4064c32b2 (patch) | |
tree | 70614ef073f437810beea848c9f9a81189b794d8 /sources/hv-collector-core | |
parent | dde383a2aa75f94c26d7949665b79cc95486a223 (diff) |
Harmonize logging and add new logs
- corrected docker-compose consul url
Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54
Issue-ID: DCAEGEN2-1003
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
10 files changed, 180 insertions, 58 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index fb949079..93940752 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -19,20 +19,38 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader +typealias ValidationFailMessage = () -> String +typealias ValidationSuccessMessage = () -> String +typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage> + internal object MessageValidator { - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } + fun validateFrameMessage(message: WireFrameMessage): ValidationResult = + message.validate().fold({ + Either.left { "Invalid wire frame header, reason: ${it.message}" } + }, { + Either.right { "Wire frame header is valid" } + }) + + fun validateProtobufMessage(message: VesMessage): ValidationResult = + if (message.isValid()) { + Either.right { "Protocol buffers message is valid" } + } else { + Either.left { "Unsupported protocol buffers message." } + } + + fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header) + .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = headerRequiredFieldDescriptors .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 1d43588f..c670e1d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try -import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -31,9 +30,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Option<VesMessage> = + fun decode(bytes: ByteData): Try<VesMessage> = Try { val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader VesMessage(decodedHeader, bytes) - }.toOption() + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2f12e0cd..4176de99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,17 +19,20 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,9 +52,9 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } @@ -63,26 +66,38 @@ internal class VesHvCollector( .concatMap(decoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux + .filterFailedWithLog(MessageValidator::validateFrameMessage) + + private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } + .flatMap(::decodePayload) + + private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder + .decode(rawPayload) + .filterFailedWithLog(logger, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + + private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux + .filterFailedWithLog(MessageValidator::validateProtobufMessage) private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { - logger.info("ommiting the message" + 5) - Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage) = router + .findDestination(msg) + .filterEmptyWithLog(logger, + { "Found route for message: ${it.topic}, partition: ${it.partition}" }, + { "Could not find route for message" }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + .also { logger.debug("Released buffer memory after handling message stream") } + + fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + filterFailedWithLog(logger, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index ec7c60c0..cea8a7ee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -82,8 +82,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private fun filterDifferentValues(configurationString: String) = hashOf(configurationString).let { if (it == lastConfigurationHash.get()) { + logger.trace { "No change detected in consul configuration" } Mono.empty() } else { + logger.info { "Obtained new configuration from consul:\n${configurationString}" } lastConfigurationHash.set(it) Mono.just(configurationString) } @@ -95,7 +97,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } val routing = configuration.getJsonArray("collector.routing") return CollectorConfiguration( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index a0c22418..c4d6c87e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult @@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) + .doOnNext { + if (it.isSuccessful()) { + Mono.just(it) + } else { + logger.warn(it.exception()) { "Failed to send message to Kafka" } + Mono.empty<SenderResult<RoutedMessage>>() + } + } .map { it.correlationMetadata() } return if (logger.traceEnabled) { @@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM msg) } - private fun logException(senderResult: SenderResult<out Any>) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - private fun logSentMessage(sentMsg: RoutedMessage) { logger.trace { val msgNum = sentMessages.incrementAndGet() @@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } } - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + private fun SenderResult<out Any>.isSuccessful() = exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index e535300a..0b2997fa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -57,8 +57,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> + logger.info("Collector configured with SSL enabled") this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } + }.getOrElse { + logger.info("Collector configured with SSL disabled") + this + } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index f5bfcce1..1965d78c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -20,13 +20,10 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bab95c57..437614ac 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,12 +20,21 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) + Option.fromNullable(routes.find { it.applies(commonHeader) }).also { + if (it.isEmpty()) { + logger.debug { "No route is defined for domain: ${commonHeader.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index 3090042d..60bd767b 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -19,21 +19,29 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either.Companion.left +import arrow.core.Either.Companion.right +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.InvalidMajorVersion import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes import org.onap.ves.VesEventOuterClass.CommonEventHeader.* +import kotlin.test.assertTrue internal object MessageValidatorTest : Spek({ - given("Message validator") { + describe("Message validator") { val cut = MessageValidator on("ves hv message including header with fully initialized fields") { @@ -41,29 +49,35 @@ internal object MessageValidatorTest : Spek({ it("should accept message with fully initialized message header") { val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue() + } } - VesEventDomain.values() - .forEach { domain -> - it("should accept message with $domain domain") { - val header = commonHeader(domain) - val vesMessage = VesMessage(header, vesEventBytes(header)) - assertThat(cut.isValid(vesMessage)) - .isTrue() - } + VesEventDomain.values().forEach { domain -> + it("should accept message with $domain domain") { + val header = commonHeader(domain) + val vesMessage = VesMessage(header, vesEventBytes(header)) + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue() } + } + } } on("ves hv message bytes") { val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) it("should not accept message with default header") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } val priorityTestCases = mapOf( Priority.PRIORITY_NOT_PROVIDED to false, + Priority.LOW to true, + Priority.MEDIUM to true, Priority.HIGH to true ) @@ -73,8 +87,10 @@ internal object MessageValidatorTest : Spek({ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) it("should resolve validation result") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation results") - .isEqualTo(expectedResult) + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation results") + .isEqualTo(expectedResult) + } } } } @@ -90,7 +106,9 @@ internal object MessageValidatorTest : Spek({ it("should not accept not fully initialized message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -101,7 +119,9 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -111,7 +131,10 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -121,8 +144,60 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } + } + } + + describe("validating messages and converting to Either of string for validation result") { + given("WireFrameMessage") { + on("valid message as input") { + val wireFrameMessage = WireFrameMessage("lets pretend it's valid".toByteArray()) + val mockedWireFrameMessage = mock<WireFrameMessage> { + on { validate() } doReturn right(wireFrameMessage) + } + + it("should be right") { + assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isRight()) + } + } + + on("invalid message as input") { + val mockedWireFrameMessage = mock<WireFrameMessage> { + on { validate() } doReturn left(InvalidMajorVersion(99)) + } + + it("should be left") { + assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isLeft()) + } + } } + + given("VesEvent") { + with(cut) { + on("valid message as input") { + val commonHeader = commonHeader() + val rawMessageBytes = vesEventBytes(commonHeader) + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + + it("should be right") { + assertTrue(validateProtobufMessage(vesMessage).isRight()) + } + } + } + on("invalid message as input") { + val commonHeader = newBuilder().build() + val rawMessageBytes = vesEventBytes(commonHeader) + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + + it("should be left") { + assertTrue(cut.validateProtobufMessage(vesMessage).isLeft()) + } + } + } + } } }) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 8950a557..cdee92c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -20,6 +20,8 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import arrow.core.Try +import arrow.core.success import com.google.protobuf.ByteString import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +32,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import reactor.test.test import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -68,7 +71,7 @@ internal object VesDecoderTest : Spek({ } }) -private fun <A> assertFailedWithError(option: Option<A>) = - option.exists { +private fun <A> assertFailedWithError(t: Try<A>) = + t.exists { fail("Error expected") - } + }
\ No newline at end of file |