summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt35
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt121
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt102
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt53
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt)41
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt50
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt15
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt90
-rw-r--r--hv-collector-xnf-simulator/sample-request.json20
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt8
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt43
16 files changed, 407 insertions, 210 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index d6158481..ff997173 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 3246cf59..ceae78c9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,13 +25,18 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.SynchronousSink
+import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,9 +55,10 @@ internal class VesHvCollector(
dataStream
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- .filter(WireFrame::isValid)
- .map(WireFrame::payload)
+ .filter(PayloadWireFrameMessage::isValid)
+ .map(PayloadWireFrameMessage::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
@@ -76,11 +82,22 @@ internal class VesHvCollector(
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
companion object {
private val logger = Logger(VesHvCollector::class)
+
+ private val completeStreamOnEOT by lazy {
+ BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
+ when (frame) {
+ is EndOfTransmissionMessage -> {
+ logger.info("Completing stream because of receiving EOT message")
+ sink.complete()
+ }
+ is PayloadWireFrameMessage -> sink.next(frame)
+ else -> sink.error(UnknownWireFrameTypeException(frame))
+ }
+ }
+ }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 0426ceb1..e9985766 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -75,7 +75,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
onReadIdle(timeout.toMillis()) {
logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
context().channel().close().addListener {
-
if (it.isSuccess)
logger.debug { "Client disconnected because of idle timeout" }
else
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 502505c4..fbff769f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -22,11 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -44,7 +40,7 @@ internal class WireChunkDecoder(
streamBuffer.release()
}
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
logIncomingMessage(byteBuf)
if (byteBuf.readableBytes() == 0) {
byteBuf.release()
@@ -55,13 +51,13 @@ internal class WireChunkDecoder(
}
}
- private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+ private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
decoder.decodeFirst(streamBuffer)
.fold(onError(next), onSuccess(next))
.unsafeRunSync()
}
- private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
when (err) {
is InvalidWireFrame -> IO {
next.error(WireFrameException(err))
@@ -73,20 +69,29 @@ internal class WireChunkDecoder(
}
}
- private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+ when (frame) {
+ is PayloadWireFrameMessage -> IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ is EndOfTransmissionMessage -> IO {
+ logEndOfTransmissionWireMessage()
+ next.next(frame)
+ }
}
}
-
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: WireFrame) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ }
+
+ private fun logEndOfTransmissionWireMessage() {
+ logger.trace { "Received end-of-transmission message" }
}
private fun logEndOfData() {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 33f71684..a9364ed3 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -27,10 +27,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -43,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
@@ -98,23 +101,23 @@ internal object WireChunkDecoderTest : Spek({
}
given("valid input") {
- val input = WireFrame(samplePayload)
+ val input = PayloadWireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
- .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -123,14 +126,30 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("end-of-transmission marker byte with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded end-of-transmission frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
@@ -140,16 +159,16 @@ internal object WireChunkDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(WireFrame(samplePayload))
- val input2 = encoder.encode(WireFrame(anotherPayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -158,15 +177,57 @@ internal object WireChunkDecoderTest : Spek({
}
}
+ given("two payload messages followed by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+ .writeByte(0xAA)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .verifyComplete()
+ }
+ }
+
+ given("two payload messages separated by end-of-transmission marker byte") {
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+
+ val input = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeByte(0xAA)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input).test()
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it is EndOfTransmissionMessage }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(WireFrame(samplePayload))
+ val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
@@ -183,8 +244,8 @@ internal object WireChunkDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -194,10 +255,10 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -207,8 +268,8 @@ internal object WireChunkDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(WireFrame(samplePayload))
- val frame2 = encoder.encode(WireFrame(anotherPayload))
+ val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -221,8 +282,8 @@ internal object WireChunkDecoderTest : Spek({
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -232,3 +293,15 @@ internal object WireChunkDecoderTest : Spek({
}
}
})
+
+
+private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
+ if (msg is PayloadWireFrameMessage) {
+ msg
+ } else {
+ fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
+ }
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 246fc7ed..5e6e666f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -37,22 +37,42 @@ object VesHvSpecification : Spek({
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
assertThat(messages)
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should not handle messages received from client after end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage,
+ anotherValidMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("first message should be released")
+ .isEqualTo(0)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(0)
+ assertThat(anotherValidMessage.refCnt())
+ .describedAs("second (not handled) message should not be released")
+ .isEqualTo(1)
+ }
}
describe("Memory management") {
it("should release memory for each handled and dropped message") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
@@ -76,13 +96,30 @@ object VesHvSpecification : Spek({
assertThat(msgWithTooBigPayload.refCnt())
.describedAs("message with payload exceeding 1MiB should be released")
.isEqualTo(expectedRefCnt)
+ }
+ it("should release memory for end-of-transmission message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionMessage()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink,
+ validMessage,
+ endOfTransmissionMessage
+ )
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(endOfTransmissionMessage.refCnt())
+ .describedAs("end-of-transmission message should be released")
+ .isEqualTo(expectedRefCnt)
}
it("should release memory for each message with invalid payload") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidPayload = invalidVesMessage()
val expectedRefCnt = 0
@@ -101,9 +138,7 @@ object VesHvSpecification : Spek({
}
it("should release memory for each message with garbage frame") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -124,9 +159,7 @@ object VesHvSpecification : Spek({
describe("message routing") {
it("should direct message to a topic by means of routing configuration") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
@@ -137,8 +170,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
@@ -149,20 +181,18 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages.get(0).topic).describedAs("first message topic")
+ assertThat(messages[0].topic).describedAs("first message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(1).topic).describedAs("second message topic")
+ assertThat(messages[1].topic).describedAs("second message topic")
.isEqualTo(HVRANMEAS_TOPIC)
- assertThat(messages.get(2).topic).describedAs("last message topic")
+ assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
}
it("should drop message if route was not found") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
vesMessage(Domain.OTHER, "first"),
vesMessage(Domain.HVRANMEAS, "second"),
@@ -181,8 +211,7 @@ object VesHvSpecification : Spek({
val defaultTimeout = Duration.ofSeconds(10)
it("should update collector on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
val firstCollector = sut.collector
@@ -195,8 +224,7 @@ object VesHvSpecification : Spek({
}
it("should start routing messages on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
@@ -216,8 +244,7 @@ object VesHvSpecification : Spek({
}
it("should change domain routing on configuration change") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -244,8 +271,7 @@ object VesHvSpecification : Spek({
}
it("should update routing for each client sending one message") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -274,8 +300,7 @@ object VesHvSpecification : Spek({
it("should not update routing for client sending continuous stream of messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
+ val (sut, sink) = vesHvWithStoringSink()
sut.configurationProvider.updateConfiguration(basicConfiguration)
@@ -311,9 +336,7 @@ object VesHvSpecification : Spek({
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
vesMessage(Domain.HVRANMEAS, "first"),
@@ -326,3 +349,10 @@ object VesHvSpecification : Spek({
}
})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
index e620e6b9..64b4ba26 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
@@ -23,9 +23,7 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.HVRanMeasFieldsV5
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -33,15 +31,19 @@ import java.util.*
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
+
+fun endOfTransmissionMessage(): ByteBuf =
+ allocator.buffer().writeByte(0xAA)
fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
@@ -65,22 +67,25 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
+ allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
- val gpb = vesEvent(
- domain,
- id,
- ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
- ).toByteString().asReadOnlyByteBuffer()
+ val gpb = vesEvent(
+ domain,
+ id,
+ ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ ).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
-}
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+ }
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS,
+ id: String = UUID.randomUUID().toString(),
+ hvRanMeasFields: ByteString = ByteString.EMPTY) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
index db6e1070..4811a2b4 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameMessages.kt
@@ -19,6 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.domain
+
+sealed class WireFrameMessage
+
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
@@ -49,10 +52,11 @@ package org.onap.dcae.collectors.veshv.domain
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteData,
- val version: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int) {
+data class PayloadWireFrameMessage(val payload: ByteData,
+ val version: Short,
+ val payloadTypeRaw: Short,
+ val payloadSize: Int
+) : WireFrameMessage() {
constructor(payload: ByteArray) : this(
ByteData(payload),
@@ -66,11 +70,38 @@ data class WireFrame(val payload: ByteData,
&& payload.size() == payloadSize
companion object {
+ const val MARKER_BYTE: Short = 0xFF
+
const val SUPPORTED_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
- const val MARKER_BYTE: Short = 0xFF
+
+ const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
+
+
+/**
+ * This message type should be used by client to indicate that he has finished sending data to collector.
+ *
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┐
+ * │octet│ 0 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xAA │
+ * └─────┴───────────────────────┘
+ * ```
+ *
+ * @since July 2018
+ */
+
+object EndOfTransmissionMessage : WireFrameMessage() {
+ const val MARKER_BYTE: Short = 0xAA
+}
+
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 39841d6a..ab82dc04 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,6 +24,7 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -31,10 +32,10 @@ import io.netty.buffer.ByteBufAllocator
*/
class WireFrameEncoder(val allocator: ByteBufAllocator) {
- fun encode(frame: WireFrame): ByteBuf {
- val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+ fun encode(frame: PayloadWireFrameMessage): ByteBuf {
+ val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
- bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
bb.writeByte(frame.version.toInt())
bb.writeByte(frame.payloadTypeRaw.toInt())
bb.writeInt(frame.payloadSize)
@@ -50,32 +51,54 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
*/
class WireFrameDecoder {
- fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
- else -> parseFrame(byteBuf)
+ else -> parseWireFrame(byteBuf)
}
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
-
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+ private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
+
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
+
+ private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
byteBuf.markReaderIndex()
+ val byte = byteBuf.readUnsignedByte()
- val mark = byteBuf.readUnsignedByte()
- if (mark != WireFrame.MARKER_BYTE) {
+ return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
+ Right(EndOfTransmissionMessage)
+ } else {
byteBuf.resetReaderIndex()
- return Left(InvalidWireFrameMarker(mark))
+ Left(MissingWireFrameHeaderBytes)
+ }
+ }
+
+ private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
+ byteBuf.markReaderIndex()
+
+ val mark = byteBuf.readUnsignedByte()
+ return when (mark) {
+ EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
+ PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ else -> {
+ byteBuf.resetReaderIndex()
+ Left(InvalidWireFrameMarker(mark))
+ }
}
+ }
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
val version = byteBuf.readUnsignedByte()
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
if (payloadSize > MAX_PAYLOAD_SIZE) {
+ byteBuf.resetReaderIndex()
return Left(PayloadSizeExceeded)
}
@@ -86,10 +109,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
- }
+ return Right(PayloadWireFrameMessage(payload, version, payloadTypeRaw, payloadSize))
- companion object {
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 626bf329..d82bb25f 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,9 +33,10 @@ sealed class WireFrameDecodingError(val message: String)
sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
-class InvalidWireFrameMarker(actualMarker: Short)
- : InvalidWireFrame(
- "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X"
+ .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -46,3 +47,9 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+
+// Other
+
+class UnknownWireFrameTypeException(frame: WireFrameMessage)
+ : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 4d6f0716..a5242e0f 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -20,19 +20,18 @@
package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
-import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
+import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -44,7 +43,7 @@ object WireFrameCodecsTest : Spek({
val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
+ PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -54,7 +53,7 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported version") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 100,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -66,7 +65,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData.EMPTY,
version = 1,
payloadTypeRaw = 0x69,
@@ -78,7 +77,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -90,7 +89,7 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -103,7 +102,7 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -122,7 +121,7 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getOrFail()
+ val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
@@ -142,40 +141,52 @@ object WireFrameCodecsTest : Spek({
}
}
+
describe("TCP framing") {
// see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
- it("should decode message leaving rest unread") {
+ it("should return error when buffer is empty") {
+ val buff = Unpooled.buffer()
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
+ }
+
+ it("should return end-of-transmission message when given end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
- assertThat(buff.readableBytes()).isEqualTo(1)
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
- it("should return error when not even header fits") {
+ it("should return error when given any single byte other than end-of-transmission marker byte") {
val buff = Unpooled.buffer()
- .writeByte(0xFF)
+ .writeByte(0xEE)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ }
+ it("should return error when payload message header does not fit") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+ .writeBytes("MOMOM".toByteArray())
+
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should return error when first byte is not 0xFF but length looks ok") {
+ it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
val buff = Unpooled.buffer()
- .writeByte(0xAA)
+ .writeByte(0x69)
.writeBytes("some garbage".toByteArray())
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should return error when first byte is not 0xFF and length is to short") {
+ it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
- decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+ assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
}
it("should return error when payload doesn't fit") {
@@ -186,14 +197,23 @@ object WireFrameCodecsTest : Spek({
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
+ it("should decode payload message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
}
- describe("payload size limit"){
+ describe("payload size limit") {
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -206,7 +226,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -220,7 +240,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = WireFrame(
+ val input = PayloadWireFrameMessage(
payload = ByteData(payload),
version = 1,
payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
@@ -237,5 +257,21 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
- fold({ fail(it.message) }, ::identity) as WireFrame
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
+ fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
+
+private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
+ this as? PayloadWireFrameMessage
+ ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
+
+
+private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
+ decoded.getEndOfTransmissionMessageOrFail()
+}
+
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+
+private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
+ this as? EndOfTransmissionMessage
+ ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json
deleted file mode 100644
index ca8bd885..00000000
--- a/hv-collector-xnf-simulator/sample-request.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "commonEventHeader": {
- "version": "sample-version",
- "domain": 10,
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name"
- },
- "messagesAmount": 25000
-}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
index f4c92fd4..a6d6af84 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.api
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import reactor.core.publisher.Flux
@@ -28,5 +28,5 @@ import reactor.core.publisher.Flux
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index b67bc644..6346b648 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import ratpack.exec.Promise
@@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
}
}
- private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
return ctx.request.body
.map { Json.createReader(it.inputStream).readObject() }
.map { extractMessageParameters(it) }
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
index 0d28bad0..baff967a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.ves.VesEventV5.VesEvent
@@ -35,7 +35,7 @@ import javax.json.JsonObject
*/
internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
if (messageParameters.amount < 0)
it.repeat()
@@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat
.build()
- private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
- WireFrame(vesMessageBytes(commonHeader))
+ private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
+ PayloadWireFrameMessage(vesMessageBytes(commonHeader))
private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 6487888e..2f9e0b59 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -20,13 +20,13 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
}
.build()
- fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
sendRx(messages).block()
}
- fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
@@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
return complete.then()
}
- private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound):
Publisher<Void> {
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
- val context = nettyOutbound.context()
-
- context.onClose {
- logger.info { "Connection to ${context.address()} has been closed" }
- }
-
- // TODO: Close channel after all messages have been sent
- // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
-// complete.subscribe {
-// context.channel().disconnect().addListener {
-// if (it.isSuccess)
-// logger.info { "Connection closed" }
-// else
-// logger.warn("Failed to close the connection", it.cause())
-// }
-// }
-
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
.window(MAX_BATCH_SIZE)
return nettyOutbound
+ .logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
@@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
.clientAuth(ClientAuth.REQUIRE)
.build()
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
companion object {
private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
private val logger = Logger(XnfSimulator::class)
}
}