aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt27
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt20
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt119
3 files changed, 31 insertions, 135 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index f608a2b9..8970e03e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.publisher.SynchronousSink
-import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,7 +48,7 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(PayloadWireFrameMessage::isValid)
+ .filter(WireFrameMessage::isValid)
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
@@ -62,14 +57,13 @@ internal class VesHvCollector(
.then()
}
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(decoder::decode)
- .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
- .map(PayloadWireFrameMessage::payload)
+ private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ .map(WireFrameMessage::payload)
.map(protobufDecoder::decode)
.flatMap { omitWhenNone(it) }
@@ -95,18 +89,5 @@ internal class VesHvCollector(
companion object {
private val logger = Logger(VesHvCollector::class)
-
- private val completeStreamOnEOT by lazy {
- BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
- when (frame) {
- is EndOfTransmissionMessage -> {
- logger.info("Completing stream because of receiving EOT message")
- sink.complete()
- }
- is PayloadWireFrameMessage -> sink.next(frame)
- else -> sink.error(UnknownWireFrameTypeException(frame))
- }
- }
- }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 80f62d1a..0775c652 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -76,15 +74,9 @@ internal class WireChunkDecoder(
}
private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- when (frame) {
- is PayloadWireFrameMessage -> IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
- is EndOfTransmissionMessage -> IO {
- logEndOfTransmissionWireMessage()
- next.next(frame)
- }
+ IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
}
@@ -92,14 +84,10 @@ internal class WireChunkDecoder(
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ private fun logDecodedWireMessage(wire: WireFrameMessage) {
logger.trace { "Wire payload size: ${wire.payloadSize} B" }
}
- private fun logEndOfTransmissionWireMessage() {
- logger.trace { "Received end-of-transmission message" }
- }
-
private fun logEndOfData() {
logger.trace { "End of data in current TCP buffer" }
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index a9364ed3..d214ffcf 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -27,13 +27,10 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
-import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -46,7 +43,7 @@ internal object WireChunkDecoderTest : Spek({
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
@@ -101,23 +98,23 @@ internal object WireChunkDecoderTest : Spek({
}
given("valid input") {
- val input = PayloadWireFrameMessage(samplePayload)
+ val input = WireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -126,30 +123,14 @@ internal object WireChunkDecoderTest : Spek({
}
}
- given("end-of-transmission marker byte with garbage after it") {
- val input = Unpooled.buffer()
- .writeByte(0xAA)
- .writeBytes(Unpooled.wrappedBuffer(samplePayload))
-
- it("should yield decoded end-of-transmission frame and error") {
- createInstance().decode(input).test()
- .expectNextMatches { it is EndOfTransmissionMessage }
- .verifyError(WireFrameException::class.java)
- }
-
- it("should leave memory unreleased") {
- verifyMemoryNotReleased(input)
- }
- }
-
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
@@ -159,16 +140,16 @@ internal object WireChunkDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = encoder.encode(WireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -177,57 +158,15 @@ internal object WireChunkDecoderTest : Spek({
}
}
- given("two payload messages followed by end-of-transmission marker byte") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
- val input = Unpooled.buffer()
- .writeBytes(frame1)
- .writeBytes(frame2)
- .writeByte(0xAA)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
- .expectNextMatches { it is EndOfTransmissionMessage }
- .verifyComplete()
- }
- }
-
- given("two payload messages separated by end-of-transmission marker byte") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
- val input = Unpooled.buffer()
- .writeBytes(frame1)
- .writeByte(0xAA)
- .writeBytes(frame2)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { it is EndOfTransmissionMessage }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
- .verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input)
- }
- }
-
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
@@ -244,8 +183,8 @@ internal object WireChunkDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -255,10 +194,10 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -268,8 +207,8 @@ internal object WireChunkDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -282,8 +221,8 @@ internal object WireChunkDecoderTest : Spek({
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -292,16 +231,4 @@ internal object WireChunkDecoderTest : Spek({
}
}
}
-})
-
-
-private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
- if (msg is PayloadWireFrameMessage) {
- msg
- } else {
- fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
- }
-
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- this as? EndOfTransmissionMessage
- ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") \ No newline at end of file
+}) \ No newline at end of file