aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f /hv-collector-core/src/main/kotlin
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt57
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt48
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt64
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt74
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt (renamed from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt)36
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt92
12 files changed, 339 insertions, 105 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index dfbbdb56..ed686fe8 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -20,12 +20,13 @@
package org.onap.dcae.collectors.veshv.boundary
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
interface Collector {
- fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
+ fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
typealias CollectorProvider = () -> Collector
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 850d3a84..913d8f50 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicReference
@@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- WireDecoder(),
+ { WireDecoder(it) },
VesDecoder(),
MessageValidator(),
Router(config.routing),
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index b0a9da81..12e1c1e6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -39,7 +39,8 @@ internal class MessageValidator {
fun isValid(message: VesMessage): Boolean {
val header = message.header
- return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ return ret
}
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index cdc70f82..60e7d70a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import com.google.protobuf.InvalidProtocolBufferException
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent
*/
internal class VesDecoder {
- fun decode(bb: ByteBuf): VesMessage? =
- try {
- val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
- VesMessage(decodedHeader, bb)
- } catch (ex: InvalidProtocolBufferException) {
- logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" }
- logger.debug("Cause", ex)
- null
- }
-
-
- companion object {
- private val logger = Logger(VesDecoder::class)
+ fun decode(bb: ByteBuf): VesMessage {
+ val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
+ return VesMessage(decodedHeader, bb)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 535fbe12..ac11b3e8 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -20,47 +20,43 @@
package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import java.util.concurrent.atomic.AtomicInteger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class VesHvCollector(
- private val wireDecoder: WireDecoder,
+ private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder,
private val protobufDecoder: VesDecoder,
private val validator: MessageValidator,
private val router: Router,
private val sink: Sink) : Collector {
- override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
- dataStream
- .doOnNext(this::logIncomingMessage)
- .flatMap(this::decodeWire)
- .doOnNext(this::logDecodedWireMessage)
- .flatMap(this::decodeProtobuf)
- .filter(this::validate)
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext(this::releaseMemory)
- .then()
- private fun logIncomingMessage(wire: ByteBuf) {
- logger.debug { "Got message with total ${wire.readableBytes()} B"}
- }
-
- private fun logDecodedWireMessage(payload: ByteBuf) {
- logger.debug { "Wire payload size: ${payload.readableBytes()} B"}
- }
-
- private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
-
- private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
+ override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
+ wireDecoderSupplier(alloc).let { wireDecoder ->
+ dataStream
+ .concatMap(wireDecoder::decode)
+ .filter(WireFrame::isValid)
+ .map(WireFrame::payload)
+ .map(protobufDecoder::decode)
+ .filter(this::validate)
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext(this::releaseMemory)
+ .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .then()
+ }
private fun validate(msg: VesMessage): Boolean {
val valid = validator.isValid(msg)
@@ -73,21 +69,16 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
private fun releaseMemory(msg: VesMessage) {
+ logger.trace { "Releasing memory from ${msg.rawMessage}" }
msg.rawMessage.release()
}
- private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
-
- private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
- val result = mapper(input)
- return if (result == null) {
- input.release()
- Mono.empty()
- } else {
- Mono.just(result)
- }
+ private fun releaseBuffersMemory(wireDecoder: WireDecoder) {
+ wireDecoder.release()
}
+ private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+
companion object {
val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 0aacb266..8e6db2af 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -19,24 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.common.serialization.ByteBufferSerializer
-import org.apache.kafka.common.serialization.StringSerializer
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.ipc.netty.http.client.HttpClient
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.nio.ByteBuffer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -51,33 +38,6 @@ object AdapterFactory {
override fun invoke() = Flux.just(config)
}
- private class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- val sender = KafkaSender.create(
- SenderOptions.create<CommonEventHeader, ByteBuffer>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
- .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java))
- return KafkaSink(sender)
- }
- }
-
-
- private class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return object : Sink {
- private val logger = Logger(LoggingSinkProvider::class)
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
- messages
- .doOnNext { msg ->
- logger.info { "Message routed to ${msg.topic}" }
- }
- .map { it.message }
-
- }
- }
- }
-
fun consulConfigurationProvider(url: String): ConfigurationProvider =
ConsulConfigurationProvider(url, httpAdapter())
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
new file mode 100644
index 00000000..82452e1e
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteBufferSerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ }
+
+ private fun constructSenderOptions(config: CollectorConfiguration) =
+ SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)
+
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
new file mode 100644
index 00000000..62b6d1aa
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class LoggingSinkProvider : SinkProvider {
+
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return object : Sink {
+ private val logger = Logger(LoggingSinkProvider::class)
+ private val totalMessages = AtomicLong()
+ private val totalBytes = AtomicLong()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+ messages
+ .doOnNext(this::logMessage)
+ .map { it.message }
+
+ private fun logMessage(msg: RoutedMessage) {
+ val msgs = totalMessages.addAndGet(1)
+ val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong())
+ val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
+ if (msgs % INFO_LOGGING_FREQ == 0L)
+ logger.info(logMessageSupplier)
+ else
+ logger.trace(logMessageSupplier)
+ }
+
+ }
+ }
+
+ companion object {
+ const val INFO_LOGGING_FREQ = 100_000
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 208b1ba0..564aa8df 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
logger.debug("Got connection")
+ nettyOutbound.alloc()
val sendHello = nettyOutbound
.options { it.flushOnEach() }
.sendString(Mono.just("ONAP_VES_HV/0.1\n"))
.then()
- val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive())
+ val handleIncomingMessages = collectorProvider()
+ .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
return sendHello.then(handleIncomingMessages)
}
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
new file mode 100644
index 00000000..e4dd7cf6
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.CompositeByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.core.publisher.FluxSink
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Consumer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class StreamBufferEmitter(
+ private val streamBuffer: CompositeByteBuf,
+ private val newFrame: ByteBuf)
+ : Consumer<FluxSink<WireFrame>> {
+
+ private val subscribed = AtomicBoolean(false)
+
+ override fun accept(sink: FluxSink<WireFrame>) {
+ when {
+
+ subscribed.getAndSet(true) ->
+ sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
+
+ newFrame.readableBytes() == 0 -> {
+ logger.trace { "Discarding empty buffer" }
+ newFrame.release()
+ sink.complete()
+ }
+
+ else -> {
+ streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
+ sink.onDispose {
+ logger.debug("Disposing read components")
+ streamBuffer.discardReadComponents()
+ }
+ sink.onRequest { requestedFrameCount ->
+ WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber()
+ }
+ }
+ }
+ }
+
+ companion object {
+ fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
+ Flux.create(StreamBufferEmitter(streamBuffer, newFrame))
+
+ private const val INCREASE_WRITER_INDEX = true
+ private val logger = Logger(StreamBufferEmitter::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
index 6f6ac2a7..b701aaf2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
@@ -17,28 +17,40 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl
+package org.onap.dcae.collectors.veshv.impl.wire
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class WireDecoder {
- fun decode(byteBuf: ByteBuf): ByteBuf? =
- try {
- WireFrame.decode(byteBuf)
- .takeIf { it.isValid() }
- .let { it?.payload }
- } catch (ex: IndexOutOfBoundsException) {
- logger.debug { "Wire protocol frame could not be decoded - input is too small" }
- null
- }
+internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+ private val streamBuffer = alloc.compositeBuffer()
+
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf)
+ .doOnSubscribe { logIncomingMessage(byteBuf) }
+ .doOnNext(this::logDecodedWireMessage)
+
+ fun release() {
+ streamBuffer.release()
+ }
+
+
+ private fun logIncomingMessage(wire: ByteBuf) {
+ logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ }
+
+ private fun logDecodedWireMessage(wire: WireFrame) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ }
companion object {
- private val logger = Logger(WireDecoder::class)
+ val logger = Logger(VesHvCollector::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
new file mode 100644
index 00000000..bc9c8389
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.FluxSink
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireFrameSink(
+ private val streamBuffer: ByteBuf,
+ private val sink: FluxSink<WireFrame>,
+ private val requestedFrameCount: Long) {
+
+ fun handleSubscriber() {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ sink.error(ex)
+ }
+
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+
+ }
+
+ private fun pushAvailableFrames() {
+ var nextFrame = decodeFirstFrameFromBuffer()
+ while (nextFrame != null && !sink.isCancelled) {
+ sink.next(nextFrame)
+ nextFrame = decodeFirstFrameFromBuffer()
+ }
+ sink.complete()
+ }
+
+ private fun pushUpToNumberOfFrames() {
+ var nextFrame = decodeFirstFrameFromBuffer()
+ var remaining = requestedFrameCount
+ loop@ while (nextFrame != null && !sink.isCancelled) {
+ sink.next(nextFrame)
+ if (--remaining > 0) {
+ nextFrame = decodeFirstFrameFromBuffer()
+ } else {
+ break@loop
+ }
+ }
+ if (remaining > 0 && nextFrame == null) {
+ sink.complete()
+ }
+ }
+
+ private fun decodeFirstFrameFromBuffer(): WireFrame? =
+ try {
+ WireFrame.decodeFirst(streamBuffer)
+ } catch (ex: MissingWireFrameBytesException) {
+ logger.debug { "${ex.message} - waiting for more data" }
+ null
+ }
+
+ companion object {
+ private val logger = Logger(WireFrameSink::class)
+ }
+}