aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-core/src/main/kotlin
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt35
4 files changed, 43 insertions, 23 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index d6158481..ff997173 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 3246cf59..ceae78c9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.SynchronousSink
+import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,9 +55,10 @@ internal class VesHvCollector(
dataStream
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- .filter(WireFrame::isValid)
- .map(WireFrame::payload)
+ .filter(PayloadWireFrameMessage::isValid)
+ .map(PayloadWireFrameMessage::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
@@ -76,11 +82,22 @@ internal class VesHvCollector(
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
companion object {
private val logger = Logger(VesHvCollector::class)
+
+ private val completeStreamOnEOT by lazy {
+ BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
+ when (frame) {
+ is EndOfTransmissionMessage -> {
+ logger.info("Completing stream because of receiving EOT message")
+ sink.complete()
+ }
+ is PayloadWireFrameMessage -> sink.next(frame)
+ else -> sink.error(UnknownWireFrameTypeException(frame))
+ }
+ }
+ }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0426ceb1..e9985766 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
onReadIdle(timeout.toMillis()) {
logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
context().channel().close().addListener {
-
if (it.isSuccess)
logger.debug { "Client disconnected because of idle timeout" }
else
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 502505c4..fbff769f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -44,7 +40,7 @@ internal class WireChunkDecoder(
streamBuffer.release()
}
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
logIncomingMessage(byteBuf)
if (byteBuf.readableBytes() == 0) {
byteBuf.release()
@@ -55,13 +51,13 @@ internal class WireChunkDecoder(
}
}
- private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
.unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
when (err) {
is InvalidWireFrame -> IO {
next.error(WireFrameException(err))
@@ -73,20 +69,29 @@ internal class WireChunkDecoder(
}
}
- private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+ when (frame) {
+ is PayloadWireFrameMessage -> IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ is EndOfTransmissionMessage -> IO {
+ logEndOfTransmissionWireMessage()
+ next.next(frame)
+ }
}
}
-
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: WireFrame) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ }
+
+ private fun logEndOfTransmissionWireMessage() {
+ logger.trace { "Received end-of-transmission message" }
}
private fun logEndOfData() {