diff options
Diffstat (limited to 'sources')
20 files changed, 397 insertions, 82 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index fb949079..93940752 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -19,20 +19,38 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader +typealias ValidationFailMessage = () -> String +typealias ValidationSuccessMessage = () -> String +typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage> + internal object MessageValidator { - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } + fun validateFrameMessage(message: WireFrameMessage): ValidationResult = + message.validate().fold({ + Either.left { "Invalid wire frame header, reason: ${it.message}" } + }, { + Either.right { "Wire frame header is valid" } + }) + + fun validateProtobufMessage(message: VesMessage): ValidationResult = + if (message.isValid()) { + Either.right { "Protocol buffers message is valid" } + } else { + Either.left { "Unsupported protocol buffers message." } + } + + fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header) + .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = headerRequiredFieldDescriptors .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 1d43588f..c670e1d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try -import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -31,9 +30,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Option<VesMessage> = + fun decode(bytes: ByteData): Try<VesMessage> = Try { val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader VesMessage(decodedHeader, bytes) - }.toOption() + } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2f12e0cd..4176de99 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,17 +19,20 @@ */ package org.onap.dcae.collectors.veshv.impl -import arrow.core.Option +import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog +import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,9 +52,9 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) + .transform(::filterInvalidWireFrame) + .transform(::decodeProtobufPayload) + .transform(::filterInvalidProtobufMessages) .transform(::routeMessage) .onErrorResume { logger.handleReactiveStreamError(it) } .doFinally { releaseBuffersMemory(wireDecoder) } @@ -63,26 +66,38 @@ internal class VesHvCollector( .concatMap(decoder::decode) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux + .filterFailedWithLog(MessageValidator::validateFrameMessage) + + private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } + .flatMap(::decodePayload) + + private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder + .decode(rawPayload) + .filterFailedWithLog(logger, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + + private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux + .filterFailedWithLog(MessageValidator::validateProtobufMessage) private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { - logger.info("ommiting the message" + 5) - Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage) = router + .findDestination(msg) + .filterEmptyWithLog(logger, + { "Found route for message: ${it.topic}, partition: ${it.partition}" }, + { "Could not find route for message" }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() + .also { logger.debug("Released buffer memory after handling message stream") } + + fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) = + filterFailedWithLog(logger, predicate) companion object { private val logger = Logger(VesHvCollector::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index ec7c60c0..cea8a7ee 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -82,8 +82,10 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private fun filterDifferentValues(configurationString: String) = hashOf(configurationString).let { if (it == lastConfigurationHash.get()) { + logger.trace { "No change detected in consul configuration" } Mono.empty() } else { + logger.info { "Obtained new configuration from consul:\n${configurationString}" } lastConfigurationHash.set(it) Mono.just(configurationString) } @@ -95,7 +97,6 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, Json.createReader(StringReader(responseString)).readObject() private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } val routing = configuration.getJsonArray("collector.routing") return CollectorConfiguration( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index a0c22418..c4d6c87e 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult @@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) + .doOnNext { + if (it.isSuccessful()) { + Mono.just(it) + } else { + logger.warn(it.exception()) { "Failed to send message to Kafka" } + Mono.empty<SenderResult<RoutedMessage>>() + } + } .map { it.correlationMetadata() } return if (logger.traceEnabled) { @@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM msg) } - private fun logException(senderResult: SenderResult<out Any>) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - private fun logSentMessage(sentMsg: RoutedMessage) { logger.trace { val msgNum = sentMessages.incrementAndGet() @@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } } - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + private fun SenderResult<out Any>.isSuccessful() = exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index e535300a..0b2997fa 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -57,8 +57,12 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, sslContextFactory .createSslContext(serverConfig.securityConfiguration) .map { sslContext -> + logger.info("Collector configured with SSL enabled") this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } + }.getOrElse { + logger.info("Collector configured with SSL disabled") + this + } private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index f5bfcce1..1965d78c 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -20,13 +20,10 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bab95c57..437614ac 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,12 +20,21 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) + Option.fromNullable(routes.find { it.applies(commonHeader) }).also { + if (it.isEmpty()) { + logger.debug { "No route is defined for domain: ${commonHeader.domain}" } + } + } + + companion object { + private val logger = Logger(Routing::class) + } } data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index 3090042d..60bd767b 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -19,21 +19,29 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Either.Companion.left +import arrow.core.Either.Companion.right +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.InvalidMajorVersion import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes import org.onap.ves.VesEventOuterClass.CommonEventHeader.* +import kotlin.test.assertTrue internal object MessageValidatorTest : Spek({ - given("Message validator") { + describe("Message validator") { val cut = MessageValidator on("ves hv message including header with fully initialized fields") { @@ -41,29 +49,35 @@ internal object MessageValidatorTest : Spek({ it("should accept message with fully initialized message header") { val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue() + } } - VesEventDomain.values() - .forEach { domain -> - it("should accept message with $domain domain") { - val header = commonHeader(domain) - val vesMessage = VesMessage(header, vesEventBytes(header)) - assertThat(cut.isValid(vesMessage)) - .isTrue() - } + VesEventDomain.values().forEach { domain -> + it("should accept message with $domain domain") { + val header = commonHeader(domain) + val vesMessage = VesMessage(header, vesEventBytes(header)) + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue() } + } + } } on("ves hv message bytes") { val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) it("should not accept message with default header") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } val priorityTestCases = mapOf( Priority.PRIORITY_NOT_PROVIDED to false, + Priority.LOW to true, + Priority.MEDIUM to true, Priority.HIGH to true ) @@ -73,8 +87,10 @@ internal object MessageValidatorTest : Spek({ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) it("should resolve validation result") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation results") - .isEqualTo(expectedResult) + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation results") + .isEqualTo(expectedResult) + } } } } @@ -90,7 +106,9 @@ internal object MessageValidatorTest : Spek({ it("should not accept not fully initialized message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -101,7 +119,9 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -111,7 +131,10 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } } } @@ -121,8 +144,60 @@ internal object MessageValidatorTest : Spek({ it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + + with(cut) { + assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse() + } + } + } + + describe("validating messages and converting to Either of string for validation result") { + given("WireFrameMessage") { + on("valid message as input") { + val wireFrameMessage = WireFrameMessage("lets pretend it's valid".toByteArray()) + val mockedWireFrameMessage = mock<WireFrameMessage> { + on { validate() } doReturn right(wireFrameMessage) + } + + it("should be right") { + assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isRight()) + } + } + + on("invalid message as input") { + val mockedWireFrameMessage = mock<WireFrameMessage> { + on { validate() } doReturn left(InvalidMajorVersion(99)) + } + + it("should be left") { + assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isLeft()) + } + } } + + given("VesEvent") { + with(cut) { + on("valid message as input") { + val commonHeader = commonHeader() + val rawMessageBytes = vesEventBytes(commonHeader) + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + + it("should be right") { + assertTrue(validateProtobufMessage(vesMessage).isRight()) + } + } + } + on("invalid message as input") { + val commonHeader = newBuilder().build() + val rawMessageBytes = vesEventBytes(commonHeader) + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + + it("should be left") { + assertTrue(cut.validateProtobufMessage(vesMessage).isLeft()) + } + } + } + } } }) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 8950a557..cdee92c9 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -20,6 +20,8 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Option +import arrow.core.Try +import arrow.core.success import com.google.protobuf.ByteString import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given @@ -30,6 +32,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import reactor.test.test import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -68,7 +71,7 @@ internal object VesDecoderTest : Spek({ } }) -private fun <A> assertFailedWithError(option: Option<A>) = - option.exists { +private fun <A> assertFailedWithError(t: Try<A>) = + t.exists { fail("Error expected") - } + }
\ No newline at end of file diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml index 48da3b18..09ac3573 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml +++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml @@ -27,7 +27,7 @@ </appender> <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> - <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> + <!--<logger name="reactor.netty" level="DEBUG"/>--> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt index 7cbf3530..e4a1dd85 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt @@ -27,7 +27,7 @@ enum class PayloadContentType(val hexValue: Int) { GOOGLE_PROTOCOL_BUFFER(0x0001); companion object { - private val hexValues = PayloadContentType.values().map { it.hexValue } + val hexValues = PayloadContentType.values().map { it.hexValue } fun isValidHexValue(hex: Int) = hexValues.contains(hex) } diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index 0d55cebb..4d60d62c 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -46,3 +46,19 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") + +// WireFrameMessage validation exceptions + +sealed class WireFrameMessageValidationError(val message: String) + +class InvalidMajorVersion(actualVersion: Short) : WireFrameMessageValidationError( + "Invalid major version in wire frame header. " + + "Expected ${WireFrameMessage.SUPPORTED_VERSION_MAJOR} but was $actualVersion") + +class UnsupportedPayloadContentType(actualType: Int) : WireFrameMessageValidationError( + "Invalid content type in wire frame header. " + + "Expected one of ${PayloadContentType.hexValues}, but was $actualType") + +class NotMatchingPayloadSize(definedInHeader: Int, actual: Int) : WireFrameMessageValidationError( + "Payload size does not match one defined in wire frame header.\n" + + "Defined in header: $definedInHeader, but was: $actual") diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index de37b140..1257c6bb 100644 --- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -19,6 +19,11 @@ */ package org.onap.dcae.collectors.veshv.domain +import arrow.core.Either +import arrow.core.Either.Companion.left +import arrow.core.Either.Companion.right + + /** * Wire frame structure is presented bellow using ASN.1 notation. Please note that official supported specification * should be available on @@ -62,10 +67,13 @@ data class WireFrameMessage(val payload: ByteData, PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payload.size) - fun isValid(): Boolean = - versionMajor == SUPPORTED_VERSION_MAJOR - && PayloadContentType.isValidHexValue(payloadType) - && payload.size() == payloadSize + fun validate(): Either<WireFrameMessageValidationError, WireFrameMessage> = + when { + versionMajor != SUPPORTED_VERSION_MAJOR -> left(InvalidMajorVersion(versionMajor)) + !PayloadContentType.isValidHexValue(payloadType) -> left(UnsupportedPayloadContentType(payloadType)) + payload.size() != payloadSize -> left(NotMatchingPayloadSize(payload.size(), payloadSize)) + else -> right(this) + } companion object { const val MARKER_BYTE: Short = 0xAA diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index f17a79ba..f8fbc0a3 100644 --- a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -60,7 +60,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = 0) it("should fail validation") { - assertThat(input.isValid()).isFalse() + input.validate().assertFailedWithError { it.isInstanceOf(InvalidMajorVersion::class.java) } } } @@ -73,7 +73,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = 0) it("should pass validation") { - assertThat(input.isValid()).isTrue() + assertTrue(input.validate().isRight()) } } @@ -86,7 +86,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = 0) it("should fail validation") { - assertThat(input.isValid()).isFalse() + input.validate().assertFailedWithError { it.isInstanceOf(UnsupportedPayloadContentType::class.java) } } } @@ -99,7 +99,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = 1) it("should fail validation") { - assertThat(input.isValid()).isFalse() + input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) } } } @@ -112,7 +112,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = 8) it("should fail validation") { - assertThat(input.isValid()).isFalse() + input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) } } } @@ -126,7 +126,7 @@ object WireFrameCodecsTest : Spek({ payloadSize = payload.size) it("should pass validation") { - assertThat(input.isValid()).isTrue() + assertTrue(input.validate().isRight()) } } @@ -214,7 +214,7 @@ object WireFrameCodecsTest : Spek({ .writeByte(0xAB) val decoded = decoder.decodeFirst(buff).getMessageOrFail() - assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertTrue(decoded.validate().isRight(), "should be valid") assertThat(buff.readableBytes()).isEqualTo(1) } } diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index f12d9acf..ac282008 100644 --- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.netty.http.server.HttpServer @@ -55,9 +56,15 @@ class HealthCheckApiServer(private val healthState: HealthState, private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) = healthDescription.get().run { + logger.debug { "HV-VES status: $status, $message" } resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message)) } private fun livenessHandler(_req: HttpServerRequest, resp: HttpServerResponse) = resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet")) + + companion object { + private val logger = Logger(HealthCheckApiServer::class) + } + } diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index c76ff21a..bee0dae1 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -36,11 +36,14 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/> - <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/> + <logger name="reactor.netty" level="WARN"/> + <logger name="io.netty" level="DEBUG"/> + <logger name="io.netty.util" level="WARN"/> + <logger name="org.apache.kafka" level="WARN"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt index 714702d3..e8ec2549 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt @@ -19,10 +19,48 @@ */ package org.onap.dcae.collectors.veshv.utils.logging +import arrow.core.Either +import arrow.core.Option +import arrow.core.Try import reactor.core.publisher.Flux +import reactor.core.publisher.Mono fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> { - logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})") logger.debug("Detailed stack trace", ex) return returnFlux } + + +fun <T> Try<T>.filterFailedWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: (Throwable) -> String): Flux<T> = + fold({ + logger.warn(rejectedMsg(it)) + Flux.empty<T>() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun <T> Option<T>.filterEmptyWithLog(logger: Logger, + acceptedMsg: (T) -> String, + rejectedMsg: () -> String): Flux<T> = + fold({ + logger.warn(rejectedMsg) + Flux.empty<T>() + }, { + logger.trace { acceptedMsg(it) } + Flux.just(it) + }) + +fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) = + flatMap { t -> + predicate(t).fold({ + logger.warn(it) + Mono.empty<T>() + }, { + logger.trace(it) + Mono.just<T>(t) + }) + } diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt new file mode 100644 index 00000000..0f359df3 --- /dev/null +++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.logging + +import arrow.core.Either +import arrow.core.Failure +import arrow.core.Option +import arrow.core.Try +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import reactor.core.publisher.Flux +import reactor.test.test +import kotlin.test.fail + +class ReactiveLoggingTest : Spek({ + + describe("filtering with log message") { + val logger = Logger("React") + val event = 5 + + describe("Try") { + given("successful Try") { + val cut = Try.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("failed Try") { + val e = Exception() + val cut = Failure(e) + it("should filter stream event and log rejected message") { + cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE) + .test() + .verifyComplete() + } + } + } + + describe("Option") { + given("Option with content") { + val cut = Option.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("empty Option") { + val cut = Option.empty<Int>() + it("should filter stream event and log rejected message") { + cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE) + .test() + .verifyComplete() + } + } + } + + + describe("Either") { + given("successful Either (right)") { + val cut = Flux.just(event) + + it("should not filter stream event and log accepted message") { + cut.filterFailedWithLog(logger, right()) + .test() + .expectNext(event) + .verifyComplete() + } + } + + given("failed Either (left)") { + val cut = Flux.just(event) + + it("should filter stream event and log rejected message") { + cut.filterFailedWithLog(logger, left()) + .test() + .verifyComplete() + } + } + } + } +}) + + +val ACCEPTED_MESSAGE: (Int) -> String = { "SUCCESS" } +val FAILED_MESSAGE: () -> String = { "FAILED" } +val FAILED_WITH_EXCEPTION_MESSAGE: (Throwable) -> String = { "FAILED" } + +private fun right(): (Int) -> Either<() -> String, () -> String> = + { Either.cond(true, { { "SUCCESS" } }, { fail() }) } + +private fun left(): (Int) -> Either<() -> String, () -> String> = + { Either.cond(false, { fail() }, { FAILED_MESSAGE }) } diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt index e2aec7df..930f020b 100644 --- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt +++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt @@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.VesEvent import reactor.test.test +import kotlin.test.assertTrue /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -103,7 +104,7 @@ object MessageGeneratorImplTest : Spek({ ))) .test() .assertNext { - assertThat(it.isValid()).isTrue() + assertTrue(it.validate().isRight()) assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName) } @@ -122,7 +123,7 @@ object MessageGeneratorImplTest : Spek({ ))) .test() .assertNext { - assertThat(it.isValid()).isTrue() + assertTrue(it.validate().isRight()) assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName) } @@ -140,7 +141,7 @@ object MessageGeneratorImplTest : Spek({ ))) .test() .assertNext { - assertThat(it.isValid()).isTrue() + assertTrue(it.validate().isRight()) assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThatExceptionOfType(InvalidProtocolBufferException::class.java) .isThrownBy { extractCommonEventHeader(it.payload) } @@ -159,7 +160,7 @@ object MessageGeneratorImplTest : Spek({ ))) .test() .assertNext { - assertThat(it.isValid()).isFalse() + assertTrue(it.validate().isLeft()) assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName) assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR) @@ -178,7 +179,7 @@ object MessageGeneratorImplTest : Spek({ ))) .test() .assertNext { - assertThat(it.isValid()).isTrue() + assertTrue(it.validate().isRight()) assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName) |