summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt23
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt31
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt13
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt3
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt54
5 files changed, 110 insertions, 14 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 49eeddaa..d917c71a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -57,9 +57,11 @@ object VesHvSpecification : Spek({
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
val expectedRefCnt = 0
- val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
+ val handledEvents = sut.handleConnection(
+ sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
assertThat(handledEvents).hasSize(1)
@@ -72,6 +74,9 @@ object VesHvSpecification : Spek({
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
.isEqualTo(expectedRefCnt)
+ assertThat(msgWithTooBigPayload.refCnt())
+ .describedAs("message with payload exceeding 1MiB should be released")
+ .isEqualTo(expectedRefCnt)
}
@@ -148,4 +153,20 @@ object VesHvSpecification : Spek({
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
+
+ describe("request validation") {
+ it("should reject message with payload greater than 1 MiB and all subsequent messages") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val handledMessages = sut.handleConnection(sink,
+ vesMessage(Domain.HVRANMEAS, "first"),
+ vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
+ vesMessage(Domain.HVRANMEAS, "third"))
+
+ assertThat(handledMessages).hasSize(1)
+ assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
+ }
+ }
})
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
index 8895d642..e620e6b9 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
@@ -20,8 +20,12 @@
package org.onap.dcae.collectors.veshv.tests.component
import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.*
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.ves.HVRanMeasFieldsV5
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -29,7 +33,7 @@ import java.util.*
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = allocator.buffer().run {
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
writeByte(0xFF) // always 0xFF
writeByte(0x01) // version
writeByte(0x01) // content type = GPB
@@ -40,7 +44,7 @@ fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toS
}
-fun invalidVesMessage() = allocator.buffer().run {
+fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
writeByte(0xFF) // always 0xFF
writeByte(0x01) // version
writeByte(0x01) // content type = GPB
@@ -51,17 +55,32 @@ fun invalidVesMessage() = allocator.buffer().run {
}
-fun garbageFrame() = allocator.buffer().run {
+fun garbageFrame(): ByteBuf = allocator.buffer().run {
writeBytes("the meaning of life is &@)(*_!".toByteArray())
}
-fun invalidWireFrame() = allocator.buffer().run {
+fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0xFF)
writeByte(0x01) // version
writeByte(0x01) // content type = GPB
}
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) =
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(0x01) // version
+ writeByte(0x01) // content type = GPB
+
+ val gpb = vesEvent(
+ domain,
+ id,
+ ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ ).toByteString().asReadOnlyByteBuffer()
+
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+}
+
+fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
@@ -76,5 +95,5 @@ fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().t
.setStartEpochMicrosec(120034455)
.setLastEpochMicrosec(120034459)
.setSequence(1))
- .setHvRanMeasFields(ByteString.EMPTY)
+ .setHvRanMeasFields(hvRanMeasFields)
.build()
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 22767ed3..39841d6a 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -52,9 +52,9 @@ class WireFrameDecoder {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
when {
- isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ isEmpty(byteBuf) -> Left(EmptyWireFrame)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
- else -> parseFrame(byteBuf)
+ else -> parseFrame(byteBuf)
}
private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
@@ -74,6 +74,11 @@ class WireFrameDecoder {
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
+
+ if (payloadSize > MAX_PAYLOAD_SIZE) {
+ return Left(PayloadSizeExceeded)
+ }
+
if (byteBuf.readableBytes() < payloadSize) {
byteBuf.resetReaderIndex()
return Left(MissingWireFramePayloadBytes)
@@ -83,4 +88,8 @@ class WireFrameDecoder {
return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
}
+
+ companion object {
+ const val MAX_PAYLOAD_SIZE = 1024 * 1024
+ }
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index fb225202..626bf329 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.domain
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
@@ -35,6 +37,7 @@ class InvalidWireFrameMarker(actualMarker: Short)
: InvalidWireFrame(
"Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
// Missing bytes errors
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index a97d889c..4d6f0716 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -30,7 +30,9 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
+import kotlin.test.assertTrue
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -153,7 +155,7 @@ object WireFrameCodecsTest : Spek({
assertThat(buff.readableBytes()).isEqualTo(1)
}
- it("should throw exception when not even header fits") {
+ it("should return error when not even header fits") {
val buff = Unpooled.buffer()
.writeByte(0xFF)
@@ -161,7 +163,7 @@ object WireFrameCodecsTest : Spek({
}
- it("should throw exception when first byte is not 0xFF but length looks ok") {
+ it("should return error when first byte is not 0xFF but length looks ok") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
.writeBytes("some garbage".toByteArray())
@@ -169,14 +171,14 @@ object WireFrameCodecsTest : Spek({
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
- it("should throw exception when first byte is not 0xFF and length is to short") {
+ it("should return error when first byte is not 0xFF and length is to short") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
- it("should throw exception when payload doesn't fit") {
+ it("should return error when payload doesn't fit") {
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
buff.writerIndex(buff.writerIndex() - 2)
@@ -185,8 +187,50 @@ object WireFrameCodecsTest : Spek({
}
}
- }
+ describe("payload size limit"){
+
+ it("should decode successfully when payload size is equal 1 MiB") {
+
+ val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val input = WireFrame(
+ payload = ByteData(payload),
+ version = 1,
+ payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+
+ assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight())
+ }
+
+ it("should return error when payload exceeds 1 MiB") {
+
+ val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+ val input = WireFrame(
+ payload = ByteData(payload),
+ version = 1,
+ payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+
+ decoder.decodeFirst(encoder.encode(input))
+ .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) }
+ }
+
+ it("should validate only first message") {
+
+ val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val input = WireFrame(
+ payload = ByteData(payload),
+ version = 1,
+ payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadSize = payload.size)
+
+
+ assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xFF)).isRight())
+ }
+ }
+ }
})
private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {