diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-07 11:52:16 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 13:06:43 +0200 |
commit | 07bbbf71cd65b29f446a1b475add87f20365db83 (patch) | |
tree | e64fcf12c21e46358043744476d68765634d7f6f /hv-collector-core | |
parent | 767d0464a19e0949d2919e6df15c9653dec50503 (diff) |
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer
snapshots - not separate messages. That means that we need to join
incomming buffers and then split it into separate WireFrames.
Closes ONAP-312
Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
16 files changed, 578 insertions, 212 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index dfbbdb56..ed686fe8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -20,12 +20,13 @@ package org.onap.dcae.collectors.veshv.boundary import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration import reactor.core.publisher.Flux import reactor.core.publisher.Mono interface Collector { - fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> } typealias CollectorProvider = () -> Collector diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 850d3a84..913d8f50 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference @@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - WireDecoder(), + { WireDecoder(it) }, VesDecoder(), MessageValidator(), Router(config.routing), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index b0a9da81..12e1c1e6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -39,7 +39,8 @@ internal class MessageValidator { fun isValid(message: VesMessage): Boolean { val header = message.header - return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + return ret } private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index cdc70f82..60e7d70a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl -import com.google.protobuf.InvalidProtocolBufferException import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bb: ByteBuf): VesMessage? = - try { - val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader - VesMessage(decodedHeader, bb) - } catch (ex: InvalidProtocolBufferException) { - logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" } - logger.debug("Cause", ex) - null - } - - - companion object { - private val logger = Logger(VesDecoder::class) + fun decode(bb: ByteBuf): VesMessage { + val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader + return VesMessage(decodedHeader, bb) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 535fbe12..ac11b3e8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -20,47 +20,43 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicInteger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ internal class VesHvCollector( - private val wireDecoder: WireDecoder, + private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder, private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, private val sink: Sink) : Collector { - override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = - dataStream - .doOnNext(this::logIncomingMessage) - .flatMap(this::decodeWire) - .doOnNext(this::logDecodedWireMessage) - .flatMap(this::decodeProtobuf) - .filter(this::validate) - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext(this::releaseMemory) - .then() - private fun logIncomingMessage(wire: ByteBuf) { - logger.debug { "Got message with total ${wire.readableBytes()} B"} - } - - private fun logDecodedWireMessage(payload: ByteBuf) { - logger.debug { "Wire payload size: ${payload.readableBytes()} B"} - } - - private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) - - private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) + override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = + wireDecoderSupplier(alloc).let { wireDecoder -> + dataStream + .concatMap(wireDecoder::decode) + .filter(WireFrame::isValid) + .map(WireFrame::payload) + .map(protobufDecoder::decode) + .filter(this::validate) + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext(this::releaseMemory) + .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .then() + } private fun validate(msg: VesMessage): Boolean { val valid = validator.isValid(msg) @@ -73,21 +69,16 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) private fun releaseMemory(msg: VesMessage) { + logger.trace { "Releasing memory from ${msg.rawMessage}" } msg.rawMessage.release() } - private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) - - private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { - val result = mapper(input) - return if (result == null) { - input.release() - Mono.empty() - } else { - Mono.just(result) - } + private fun releaseBuffersMemory(wireDecoder: WireDecoder) { + wireDecoder.release() } + private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) + companion object { val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 0aacb266..8e6db2af 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -19,24 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.common.serialization.ByteBufferSerializer -import org.apache.kafka.common.serialization.StringSerializer import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.nio.ByteBuffer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -51,33 +38,6 @@ object AdapterFactory { override fun invoke() = Flux.just(config) } - private class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - val sender = KafkaSender.create( - SenderOptions.create<CommonEventHeader, ByteBuffer>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)) - return KafkaSink(sender) - } - } - - - private class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = - messages - .doOnNext { msg -> - logger.info { "Message routed to ${msg.topic}" } - } - .map { it.message } - - } - } - } - fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt new file mode 100644 index 00000000..82452e1e --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteBufferSerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.ves.VesEventV5 +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +internal class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + } + + private fun constructSenderOptions(config: CollectorConfiguration) = + SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java) + +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt new file mode 100644 index 00000000..62b6d1aa --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +internal class LoggingSinkProvider : SinkProvider { + + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + private val totalMessages = AtomicLong() + private val totalBytes = AtomicLong() + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + messages + .doOnNext(this::logMessage) + .map { it.message } + + private fun logMessage(msg: RoutedMessage) { + val msgs = totalMessages.addAndGet(1) + val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong()) + val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } + if (msgs % INFO_LOGGING_FREQ == 0L) + logger.info(logMessageSupplier) + else + logger.trace(logMessageSupplier) + } + + } + } + + companion object { + const val INFO_LOGGING_FREQ = 100_000 + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 208b1ba0..564aa8df 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { logger.debug("Got connection") + nettyOutbound.alloc() val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + val handleIncomingMessages = collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) return sendHello.then(handleIncomingMessages) } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt new file mode 100644 index 00000000..e4dd7cf6 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.CompositeByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxSink +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class StreamBufferEmitter( + private val streamBuffer: CompositeByteBuf, + private val newFrame: ByteBuf) + : Consumer<FluxSink<WireFrame>> { + + private val subscribed = AtomicBoolean(false) + + override fun accept(sink: FluxSink<WireFrame>) { + when { + + subscribed.getAndSet(true) -> + sink.error(IllegalStateException("Wire frame emitter supports only one subscriber")) + + newFrame.readableBytes() == 0 -> { + logger.trace { "Discarding empty buffer" } + newFrame.release() + sink.complete() + } + + else -> { + streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) + sink.onDispose { + logger.debug("Disposing read components") + streamBuffer.discardReadComponents() + } + sink.onRequest { requestedFrameCount -> + WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber() + } + } + } + } + + companion object { + fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> = + Flux.create(StreamBufferEmitter(streamBuffer, newFrame)) + + private const val INCREASE_WRITER_INDEX = true + private val logger = Logger(StreamBufferEmitter::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt index 6f6ac2a7..b701aaf2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt @@ -17,28 +17,40 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl +package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireDecoder { - fun decode(byteBuf: ByteBuf): ByteBuf? = - try { - WireFrame.decode(byteBuf) - .takeIf { it.isValid() } - .let { it?.payload } - } catch (ex: IndexOutOfBoundsException) { - logger.debug { "Wire protocol frame could not be decoded - input is too small" } - null - } +internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { + private val streamBuffer = alloc.compositeBuffer() + + fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf) + .doOnSubscribe { logIncomingMessage(byteBuf) } + .doOnNext(this::logDecodedWireMessage) + + fun release() { + streamBuffer.release() + } + + + private fun logIncomingMessage(wire: ByteBuf) { + logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + } + + private fun logDecodedWireMessage(wire: WireFrame) { + logger.trace { "Wire payload size: ${wire.payloadSize} B." } + } companion object { - private val logger = Logger(WireDecoder::class) + val logger = Logger(VesHvCollector::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt new file mode 100644 index 00000000..bc9c8389 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.FluxSink + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class WireFrameSink( + private val streamBuffer: ByteBuf, + private val sink: FluxSink<WireFrame>, + private val requestedFrameCount: Long) { + + fun handleSubscriber() { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + sink.error(ex) + } + + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + + } + + private fun pushAvailableFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + nextFrame = decodeFirstFrameFromBuffer() + } + sink.complete() + } + + private fun pushUpToNumberOfFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + var remaining = requestedFrameCount + loop@ while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + if (--remaining > 0) { + nextFrame = decodeFirstFrameFromBuffer() + } else { + break@loop + } + } + if (remaining > 0 && nextFrame == null) { + sink.complete() + } + } + + private fun decodeFirstFrameFromBuffer(): WireFrame? = + try { + WireFrame.decodeFirst(streamBuffer) + } catch (ex: MissingWireFrameBytesException) { + logger.debug { "${ex.message} - waiting for more data" } + null + } + + companion object { + private val logger = Logger(WireFrameSink::class) + } +} diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 8d9e4962..263ad441 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -20,8 +20,10 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString +import com.google.protobuf.InvalidProtocolBufferException import io.netty.buffer.Unpooled.wrappedBuffer import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it @@ -60,8 +62,9 @@ internal object VesDecoderTest : Spek({ on("invalid ves hv message bytes") { val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset())) - it("should return empty result") { - assertThat(cut.decode(rawMessageBytes)).isNull() + it("should throw error") { + assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { cut.decode(rawMessageBytes) } } } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt deleted file mode 100644 index 81706ce4..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt +++ /dev/null @@ -1,104 +0,0 @@ -package org.onap.dcae.collectors.veshv.impl - -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrame - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since May 2018 - */ -internal object WireDecoderTest : Spek({ - describe("decoding wire protocol") { - val cut = WireDecoder() - - fun decode(frame: WireFrame) = - cut.decode( - frame.encode(UnpooledByteBufAllocator.DEFAULT)) - - given("empty input") { - val input = Unpooled.EMPTY_BUFFER - - it("should yield empty result") { - assertThat(cut.decode(input)).isNull() - } - } - - given("input without 0xFF first byte") { - val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, - mark = 0x10, - majorVersion = 1, - minorVersion = 2, - payloadSize = 0) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with unsupported major version") { - val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, - mark = 0xFF, - majorVersion = 100, - minorVersion = 2, - payloadSize = 0) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with too small payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = 1) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with too big payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = 8) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("valid input") { - val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( - payload = Unpooled.wrappedBuffer(payload), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = payload.size) - - - it("should yield Google Protocol Buffers payload") { - val result = decode(input)!! - - val actualPayload = ByteArray(result.readableBytes()) - result.readBytes(actualPayload) - - assertThat(actualPayload).containsExactly(*payload) - } - } - } -}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt new file mode 100644 index 00000000..0a10aa1f --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt @@ -0,0 +1,233 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import reactor.test.test + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since May 2018 + */ +internal object WireDecoderTest : Spek({ + val alloc = UnpooledByteBufAllocator.DEFAULT + val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() + val anotherPayload = "ala ma kota a kot ma ale".toByteArray() + + fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc)) + + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should be released: $bb ref count") + .isEqualTo(0) + } + } + + fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should not be released: $bb ref count") + .isEqualTo(1) + } + } + + describe("decoding wire protocol") { + given("empty input") { + val input = Unpooled.EMPTY_BUFFER + + it("should yield empty result") { + WireDecoder().decode(input).test().verifyComplete() + } + } + + given("input with no readable bytes") { + val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) + + it("should yield empty result") { + WireDecoder().decode(input).test().verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + + given("invalid input (not starting with marker)") { + val input = Unpooled.wrappedBuffer(samplePayload) + + it("should yield error") { + WireDecoder().decode(input).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input") { + val input = WireFrame(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + } + + given("valid input with part of next frame") { + val input = Unpooled.buffer() + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3)) + + it("should yield decoded input frame") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input with garbage after it") { + val input = Unpooled.buffer() + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame and error") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("two inputs containing two separate messages") { + val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing 1st frame and 2nd input containing garbage") { + val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val input2 = Unpooled.wrappedBuffer(anotherPayload) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1) + .doOnNext { + // releasing retained payload + it.payload.release() + } + .test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should release memory for 1st input") { + verifyMemoryReleased(input1) + } + + it("should leave memory unreleased for 2nd input") { + verifyMemoryNotReleased(input2) + } + } + + + given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { + val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + val input1 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2, 3) + val input2 = Unpooled.buffer().writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { + val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + val input1 = Unpooled.buffer() + .writeBytes(frame1, 5) + val input2 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + } +}) diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback-test.xml index 809f62d4..84abc9d3 100644 --- a/hv-collector-core/src/test/resources/logback.xml +++ b/hv-collector-core/src/test/resources/logback-test.xml @@ -26,7 +26,7 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> |