From 07bbbf71cd65b29f446a1b475add87f20365db83 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 7 Jun 2018 11:52:16 +0200 Subject: Fix TCP stream framing issue Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 3 +- .../collectors/veshv/factory/CollectorFactory.kt | 4 +- .../dcae/collectors/veshv/impl/MessageValidator.kt | 3 +- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 17 +--- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 57 ++++++-------- .../onap/dcae/collectors/veshv/impl/WireDecoder.kt | 44 ----------- .../veshv/impl/adapters/AdapterFactory.kt | 40 ---------- .../veshv/impl/adapters/KafkaSinkProvider.kt | 48 +++++++++++ .../veshv/impl/adapters/LoggingSinkProvider.kt | 64 +++++++++++++++ .../collectors/veshv/impl/socket/NettyTcpServer.kt | 6 +- .../veshv/impl/wire/StreamBufferEmitter.kt | 74 +++++++++++++++++ .../dcae/collectors/veshv/impl/wire/WireDecoder.kt | 56 +++++++++++++ .../collectors/veshv/impl/wire/WireFrameSink.kt | 92 ++++++++++++++++++++++ 13 files changed, 371 insertions(+), 137 deletions(-) delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index dfbbdb56..ed686fe8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -20,12 +20,13 @@ package org.onap.dcae.collectors.veshv.boundary import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration import reactor.core.publisher.Flux import reactor.core.publisher.Mono interface Collector { - fun handleConnection(dataStream: Flux): Mono + fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono } typealias CollectorProvider = () -> Collector diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 850d3a84..913d8f50 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference @@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - WireDecoder(), + { WireDecoder(it) }, VesDecoder(), MessageValidator(), Router(config.routing), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index b0a9da81..12e1c1e6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -39,7 +39,8 @@ internal class MessageValidator { fun isValid(message: VesMessage): Boolean { val header = message.header - return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + return ret } private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index cdc70f82..60e7d70a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl -import com.google.protobuf.InvalidProtocolBufferException import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bb: ByteBuf): VesMessage? = - try { - val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader - VesMessage(decodedHeader, bb) - } catch (ex: InvalidProtocolBufferException) { - logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" } - logger.debug("Cause", ex) - null - } - - - companion object { - private val logger = Logger(VesDecoder::class) + fun decode(bb: ByteBuf): VesMessage { + val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader + return VesMessage(decodedHeader, bb) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 535fbe12..ac11b3e8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -20,47 +20,43 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicInteger /** * @author Piotr Jaszczyk * @since May 2018 */ internal class VesHvCollector( - private val wireDecoder: WireDecoder, + private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder, private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, private val sink: Sink) : Collector { - override fun handleConnection(dataStream: Flux): Mono = - dataStream - .doOnNext(this::logIncomingMessage) - .flatMap(this::decodeWire) - .doOnNext(this::logDecodedWireMessage) - .flatMap(this::decodeProtobuf) - .filter(this::validate) - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext(this::releaseMemory) - .then() - private fun logIncomingMessage(wire: ByteBuf) { - logger.debug { "Got message with total ${wire.readableBytes()} B"} - } - - private fun logDecodedWireMessage(payload: ByteBuf) { - logger.debug { "Wire payload size: ${payload.readableBytes()} B"} - } - - private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) - - private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) + override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono = + wireDecoderSupplier(alloc).let { wireDecoder -> + dataStream + .concatMap(wireDecoder::decode) + .filter(WireFrame::isValid) + .map(WireFrame::payload) + .map(protobufDecoder::decode) + .filter(this::validate) + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext(this::releaseMemory) + .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .then() + } private fun validate(msg: VesMessage): Boolean { val valid = validator.isValid(msg) @@ -73,21 +69,16 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage): Mono = omitWhenNull(msg, router::findDestination) private fun releaseMemory(msg: VesMessage) { + logger.trace { "Releasing memory from ${msg.rawMessage}" } msg.rawMessage.release() } - private fun omitWhenNull(input: T, mapper: (T) -> V?): Mono = Mono.justOrEmpty(mapper(input)) - - private fun releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono { - val result = mapper(input) - return if (result == null) { - input.release() - Mono.empty() - } else { - Mono.just(result) - } + private fun releaseBuffersMemory(wireDecoder: WireDecoder) { + wireDecoder.release() } + private fun omitWhenNull(input: T, mapper: (T) -> V?): Mono = Mono.justOrEmpty(mapper(input)) + companion object { val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt deleted file mode 100644 index 6f6ac2a7..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.utils.logging.Logger - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class WireDecoder { - fun decode(byteBuf: ByteBuf): ByteBuf? = - try { - WireFrame.decode(byteBuf) - .takeIf { it.isValid() } - .let { it?.payload } - } catch (ex: IndexOutOfBoundsException) { - logger.debug { "Wire protocol frame could not be decoded - input is too small" } - null - } - - companion object { - private val logger = Logger(WireDecoder::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 0aacb266..8e6db2af 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -19,24 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.common.serialization.ByteBufferSerializer -import org.apache.kafka.common.serialization.StringSerializer import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.nio.ByteBuffer /** * @author Piotr Jaszczyk @@ -51,33 +38,6 @@ object AdapterFactory { override fun invoke() = Flux.just(config) } - private class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - val sender = KafkaSender.create( - SenderOptions.create() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)) - return KafkaSink(sender) - } - } - - - private class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) - override fun send(messages: Flux): Flux = - messages - .doOnNext { msg -> - logger.info { "Message routed to ${msg.topic}" } - } - .map { it.message } - - } - } - } - fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt new file mode 100644 index 00000000..82452e1e --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteBufferSerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.ves.VesEventV5 +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +internal class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + } + + private fun constructSenderOptions(config: CollectorConfiguration) = + SenderOptions.create() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java) + +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt new file mode 100644 index 00000000..62b6d1aa --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +internal class LoggingSinkProvider : SinkProvider { + + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + private val totalMessages = AtomicLong() + private val totalBytes = AtomicLong() + + override fun send(messages: Flux): Flux = + messages + .doOnNext(this::logMessage) + .map { it.message } + + private fun logMessage(msg: RoutedMessage) { + val msgs = totalMessages.addAndGet(1) + val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong()) + val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } + if (msgs % INFO_LOGGING_FREQ == 0L) + logger.info(logMessageSupplier) + else + logger.trace(logMessageSupplier) + } + + } + } + + companion object { + const val INFO_LOGGING_FREQ = 100_000 + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 208b1ba0..564aa8df 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { logger.debug("Got connection") + nettyOutbound.alloc() val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + val handleIncomingMessages = collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) return sendHello.then(handleIncomingMessages) } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt new file mode 100644 index 00000000..e4dd7cf6 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.CompositeByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxSink +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class StreamBufferEmitter( + private val streamBuffer: CompositeByteBuf, + private val newFrame: ByteBuf) + : Consumer> { + + private val subscribed = AtomicBoolean(false) + + override fun accept(sink: FluxSink) { + when { + + subscribed.getAndSet(true) -> + sink.error(IllegalStateException("Wire frame emitter supports only one subscriber")) + + newFrame.readableBytes() == 0 -> { + logger.trace { "Discarding empty buffer" } + newFrame.release() + sink.complete() + } + + else -> { + streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) + sink.onDispose { + logger.debug("Disposing read components") + streamBuffer.discardReadComponents() + } + sink.onRequest { requestedFrameCount -> + WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber() + } + } + } + } + + companion object { + fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux = + Flux.create(StreamBufferEmitter(streamBuffer, newFrame)) + + private const val INCREASE_WRITER_INDEX = true + private val logger = Logger(StreamBufferEmitter::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt new file mode 100644 index 00000000..b701aaf2 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { + private val streamBuffer = alloc.compositeBuffer() + + fun decode(byteBuf: ByteBuf): Flux = StreamBufferEmitter.createFlux(streamBuffer, byteBuf) + .doOnSubscribe { logIncomingMessage(byteBuf) } + .doOnNext(this::logDecodedWireMessage) + + fun release() { + streamBuffer.release() + } + + + private fun logIncomingMessage(wire: ByteBuf) { + logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + } + + private fun logDecodedWireMessage(wire: WireFrame) { + logger.trace { "Wire payload size: ${wire.payloadSize} B." } + } + + companion object { + val logger = Logger(VesHvCollector::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt new file mode 100644 index 00000000..bc9c8389 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.FluxSink + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class WireFrameSink( + private val streamBuffer: ByteBuf, + private val sink: FluxSink, + private val requestedFrameCount: Long) { + + fun handleSubscriber() { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + sink.error(ex) + } + + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + + } + + private fun pushAvailableFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + nextFrame = decodeFirstFrameFromBuffer() + } + sink.complete() + } + + private fun pushUpToNumberOfFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + var remaining = requestedFrameCount + loop@ while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + if (--remaining > 0) { + nextFrame = decodeFirstFrameFromBuffer() + } else { + break@loop + } + } + if (remaining > 0 && nextFrame == null) { + sink.complete() + } + } + + private fun decodeFirstFrameFromBuffer(): WireFrame? = + try { + WireFrame.decodeFirst(streamBuffer) + } catch (ex: MissingWireFrameBytesException) { + logger.debug { "${ex.message} - waiting for more data" } + null + } + + companion object { + private val logger = Logger(WireFrameSink::class) + } +} -- cgit 1.2.3-korg