diff options
5 files changed, 110 insertions, 14 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 49eeddaa..d917c71a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -57,9 +57,11 @@ object VesHvSpecification : Spek({ val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() + val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS) val expectedRefCnt = 0 - val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame) + val handledEvents = sut.handleConnection( + sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload) assertThat(handledEvents).hasSize(1) @@ -72,6 +74,9 @@ object VesHvSpecification : Spek({ assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") .isEqualTo(expectedRefCnt) + assertThat(msgWithTooBigPayload.refCnt()) + .describedAs("message with payload exceeding 1MiB should be released") + .isEqualTo(expectedRefCnt) } @@ -148,4 +153,20 @@ object VesHvSpecification : Spek({ assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } + + describe("request validation") { + it("should reject message with payload greater than 1 MiB and all subsequent messages") { + val sink = StoringSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val handledMessages = sut.handleConnection(sink, + vesMessage(Domain.HVRANMEAS, "first"), + vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"), + vesMessage(Domain.HVRANMEAS, "third")) + + assertThat(handledMessages).hasSize(1) + assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") + } + } }) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt index 8895d642..e620e6b9 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt @@ -20,8 +20,12 @@ package org.onap.dcae.collectors.veshv.tests.component import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.* +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE +import org.onap.ves.HVRanMeasFieldsV5 import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -29,7 +33,7 @@ import java.util.* val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT -fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = allocator.buffer().run { +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { writeByte(0xFF) // always 0xFF writeByte(0x01) // version writeByte(0x01) // content type = GPB @@ -40,7 +44,7 @@ fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toS } -fun invalidVesMessage() = allocator.buffer().run { +fun invalidVesMessage(): ByteBuf = allocator.buffer().run { writeByte(0xFF) // always 0xFF writeByte(0x01) // version writeByte(0x01) // content type = GPB @@ -51,17 +55,32 @@ fun invalidVesMessage() = allocator.buffer().run { } -fun garbageFrame() = allocator.buffer().run { +fun garbageFrame(): ByteBuf = allocator.buffer().run { writeBytes("the meaning of life is &@)(*_!".toByteArray()) } -fun invalidWireFrame() = allocator.buffer().run { +fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0xFF) writeByte(0x01) // version writeByte(0x01) // content type = GPB } -fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) = +fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB + + val gpb = vesEvent( + domain, + id, + ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) + ).toByteString().asReadOnlyByteBuffer() + + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes +} + +fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) = VesEvent.newBuilder() .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() @@ -76,5 +95,5 @@ fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().t .setStartEpochMicrosec(120034455) .setLastEpochMicrosec(120034459) .setSequence(1)) - .setHvRanMeasFields(ByteString.EMPTY) + .setHvRanMeasFields(hvRanMeasFields) .build() diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 22767ed3..39841d6a 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -52,9 +52,9 @@ class WireFrameDecoder { fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> = when { - isEmpty(byteBuf) -> Left(EmptyWireFrame) + isEmpty(byteBuf) -> Left(EmptyWireFrame) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) - else -> parseFrame(byteBuf) + else -> parseFrame(byteBuf) } private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE @@ -74,6 +74,11 @@ class WireFrameDecoder { val payloadTypeRaw = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() + + if (payloadSize > MAX_PAYLOAD_SIZE) { + return Left(PayloadSizeExceeded) + } + if (byteBuf.readableBytes() < payloadSize) { byteBuf.resetReaderIndex() return Left(MissingWireFramePayloadBytes) @@ -83,4 +88,8 @@ class WireFrameDecoder { return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize)) } + + companion object { + const val MAX_PAYLOAD_SIZE = 1024 * 1024 + } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index fb225202..626bf329 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.domain +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE + /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 @@ -35,6 +37,7 @@ class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) +object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") // Missing bytes errors diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index a97d889c..4d6f0716 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -30,7 +30,9 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset +import kotlin.test.assertTrue /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -153,7 +155,7 @@ object WireFrameCodecsTest : Spek({ assertThat(buff.readableBytes()).isEqualTo(1) } - it("should throw exception when not even header fits") { + it("should return error when not even header fits") { val buff = Unpooled.buffer() .writeByte(0xFF) @@ -161,7 +163,7 @@ object WireFrameCodecsTest : Spek({ } - it("should throw exception when first byte is not 0xFF but length looks ok") { + it("should return error when first byte is not 0xFF but length looks ok") { val buff = Unpooled.buffer() .writeByte(0xAA) .writeBytes("some garbage".toByteArray()) @@ -169,14 +171,14 @@ object WireFrameCodecsTest : Spek({ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } - it("should throw exception when first byte is not 0xFF and length is to short") { + it("should return error when first byte is not 0xFF and length is to short") { val buff = Unpooled.buffer() .writeByte(0xAA) decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } - it("should throw exception when payload doesn't fit") { + it("should return error when payload doesn't fit") { val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) buff.writerIndex(buff.writerIndex() - 2) @@ -185,8 +187,50 @@ object WireFrameCodecsTest : Spek({ } } - } + describe("payload size limit"){ + + it("should decode successfully when payload size is equal 1 MiB") { + + val payload = ByteArray(MAX_PAYLOAD_SIZE) + val input = WireFrame( + payload = ByteData(payload), + version = 1, + payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + + assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight()) + } + + it("should return error when payload exceeds 1 MiB") { + + val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) + val input = WireFrame( + payload = ByteData(payload), + version = 1, + payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + + decoder.decodeFirst(encoder.encode(input)) + .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) } + } + + it("should validate only first message") { + + val payload = ByteArray(MAX_PAYLOAD_SIZE) + val input = WireFrame( + payload = ByteData(payload), + version = 1, + payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadSize = payload.size) + + + assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xFF)).isRight()) + } + } + } }) private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) { |