summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt42
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt)7
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt11
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt9
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt (renamed from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt)0
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt)25
-rw-r--r--hv-collector-domain/pom.xml4
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt62
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt (renamed from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt)23
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt26
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt33
12 files changed, 126 insertions, 120 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index d1d72592..502505c4 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -19,14 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.impl.wire
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
+import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
+import reactor.core.publisher.SynchronousSink
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,28 +56,29 @@ internal class WireChunkDecoder(
}
private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
- try {
- val frame = decodeFirstFrameFromBuffer()
- if (frame == null) {
+ decoder.decodeFirst(streamBuffer)
+ .fold(onError(next), onSuccess(next))
+ .unsafeRunSync()
+ }
+
+ private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ when (err) {
+ is InvalidWireFrame -> IO {
+ next.error(WireFrameException(err))
+ }
+ is MissingWireFrameBytes -> IO {
logEndOfData()
next.complete()
- } else {
- logDecodedWireMessage(frame)
- next.next(frame)
}
- } catch (ex: Exception) {
- next.error(ex)
}
}
-
- private fun decodeFirstFrameFromBuffer(): WireFrame? =
- try {
- decoder.decodeFirst(streamBuffer)
- } catch (ex: MissingWireFrameBytesException) {
- logger.trace { "${ex.message} - waiting for more data" }
- null
- }
+ private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
+ IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ }
private fun logIncomingMessage(wire: ByteBuf) {
@@ -90,6 +94,6 @@ internal class WireChunkDecoder(
}
companion object {
- val logger = Logger(VesHvCollector::class)
+ val logger = Logger(WireChunkDecoder::class)
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
index 6e1ce935..83a7cd85 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
@@ -17,10 +17,13 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)")
+class WireFrameException(error: WireFrameDecodingError)
+ : Exception("${error::class.simpleName}: ${error.message}")
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 1ddcc3dc..33f71684 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -30,7 +30,6 @@ import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
import reactor.test.test
/**
@@ -43,11 +42,11 @@ internal object WireChunkDecoderTest : Spek({
val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
val encoder = WireFrameEncoder(alloc)
-
+
fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
-
+
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
assertThat(bb.refCnt())
@@ -90,7 +89,7 @@ internal object WireChunkDecoderTest : Spek({
it("should yield error") {
createInstance().decode(input).test()
- .verifyError(InvalidWireFrameMarkerException::class.java)
+ .verifyError(WireFrameException::class.java)
}
it("should leave memory unreleased") {
@@ -132,7 +131,7 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyError(InvalidWireFrameMarkerException::class.java)
+ .verifyError(WireFrameException::class.java)
}
it("should leave memory unreleased") {
@@ -170,7 +169,7 @@ internal object WireChunkDecoderTest : Spek({
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .verifyError(InvalidWireFrameMarkerException::class.java)
+ .verifyError(WireFrameException::class.java)
}
it("should release memory for 1st input") {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index c68f0514..00739fa4 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -41,12 +41,13 @@ import java.time.Duration
import java.util.*
import kotlin.system.measureTimeMillis
-
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
object PerformanceSpecification : Spek({
+ debugRx(false)
+
describe("VES High Volume Collector performance") {
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
@@ -69,8 +70,8 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
- println("Processed $runs connections each containing $numMessages msgs.")
- println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ logger.info("Processed $runs connections each containing $numMessages msgs.")
+ logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -94,7 +95,7 @@ object PerformanceSpecification : Spek({
.timeout(timeout)
.block()
- println("Forwarded ${sink.count} msgs")
+ logger.info("Forwarded ${sink.count} msgs")
assertThat(sink.count)
.describedAs("should send up to number of events")
.isLessThan(numMessages)
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 08450598..49eeddaa 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -19,12 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
-import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -35,6 +33,8 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
* @since May 2018
*/
object VesHvSpecification : Spek({
+ debugRx(false)
+
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
val sink = StoringSink()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
index 8895d642..8895d642 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
index ff452a7a..29df8c70 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
@@ -17,13 +17,22 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.tests.component
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.jetbrains.spek.api.dsl.SpecBody
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Hooks
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException(
- "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+fun SpecBody.debugRx(debug: Boolean = true) {
+ if (debug) {
+ beforeGroup {
+ Hooks.onOperatorDebug()
+ }
+
+ afterGroup {
+ Hooks.resetOnOperatorDebug()
+ }
+ }
+}
+
+val logger = Logger("org.onap.dcae.collectors.veshv.tests.component")
diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml
index c11510ac..85c2a45a 100644
--- a/hv-collector-domain/pom.xml
+++ b/hv-collector-domain/pom.xml
@@ -103,6 +103,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ </dependency>
<dependency>
<groupId>org.assertj</groupId>
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 3cd9b19a..22767ed3 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.domain
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,49 +50,37 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
*/
class WireFrameDecoder {
- fun decodeFirst(byteBuf: ByteBuf): WireFrame {
- verifyNotEmpty(byteBuf)
- byteBuf.markReaderIndex()
-
- verifyMarker(byteBuf)
- verifyMinimumSize(byteBuf)
-
- val version = byteBuf.readUnsignedByte()
- val payloadTypeRaw = byteBuf.readUnsignedByte()
- val payloadSize = verifyPayloadSize(byteBuf)
- val payload = ByteData.readFrom(byteBuf, payloadSize)
+ fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+ when {
+ isEmpty(byteBuf) -> Left(EmptyWireFrame)
+ headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
+ else -> parseFrame(byteBuf)
+ }
- return WireFrame(payload, version, payloadTypeRaw, payloadSize)
- }
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
- private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
- byteBuf.readInt().let { payloadSize ->
- if (byteBuf.readableBytes() < payloadSize) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < payload size")
- } else {
- payloadSize
- }
- }
+ private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun verifyMinimumSize(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < header size")
- }
- }
+ private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+ byteBuf.markReaderIndex()
- private fun verifyMarker(byteBuf: ByteBuf) {
val mark = byteBuf.readUnsignedByte()
if (mark != WireFrame.MARKER_BYTE) {
byteBuf.resetReaderIndex()
- throw InvalidWireFrameMarkerException(mark)
+ return Left(InvalidWireFrameMarker(mark))
}
- }
- private fun verifyNotEmpty(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < 1) {
- throw EmptyWireFrameException()
+ val version = byteBuf.readUnsignedByte()
+ val payloadTypeRaw = byteBuf.readUnsignedByte()
+
+ val payloadSize = byteBuf.readInt()
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ return Left(MissingWireFramePayloadBytes)
}
+
+ val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+ return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 7e4b3cef..fb225202 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -17,10 +17,29 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.domain
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg)
+
+sealed class WireFrameDecodingError(val message: String)
+
+
+// Invalid frame errors
+
+sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
+
+class InvalidWireFrameMarker(actualMarker: Short)
+ : InvalidWireFrame(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+
+
+// Missing bytes errors
+
+sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
+
+object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
+object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
+object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
deleted file mode 100644
index 11013834..00000000
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.domain.exceptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-open class WireFrameDecodingException(msg: String) : Exception(msg)
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 9694caf7..a97d889c 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -19,16 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.domain
+import arrow.core.Either
+import arrow.core.identity
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.assertj.core.api.Assertions.fail
+import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
import java.nio.charset.Charset
/**
@@ -119,7 +120,7 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded)
+ val decoded = decoder.decodeFirst(encoded).getOrFail()
it("should decode version") {
assertThat(decoded.version).isEqualTo(frame.version)
@@ -146,7 +147,7 @@ object WireFrameCodecsTest : Spek({
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff)
+ val decoded = decoder.decodeFirst(buff).getOrFail()
assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
assertThat(buff.readableBytes()).isEqualTo(1)
@@ -156,8 +157,8 @@ object WireFrameCodecsTest : Spek({
val buff = Unpooled.buffer()
.writeByte(0xFF)
- assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { decoder.decodeFirst(buff) }
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+
}
it("should throw exception when first byte is not 0xFF but length looks ok") {
@@ -165,16 +166,14 @@ object WireFrameCodecsTest : Spek({
.writeByte(0xAA)
.writeBytes("some garbage".toByteArray())
- assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { decoder.decodeFirst(buff) }
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
}
it("should throw exception when first byte is not 0xFF and length is to short") {
val buff = Unpooled.buffer()
.writeByte(0xAA)
- assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { decoder.decodeFirst(buff) }
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
}
it("should throw exception when payload doesn't fit") {
@@ -182,11 +181,17 @@ object WireFrameCodecsTest : Spek({
.writeBytes(encodeSampleFrame())
buff.writerIndex(buff.writerIndex() - 2)
- assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { decoder.decodeFirst(buff) }
+ decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
}
}
}
-}) \ No newline at end of file
+})
+
+private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {
+ fold({ assertj(assertThat(it)) }, { fail("Error expected") })
+}
+
+private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
+ fold({ fail(it.message) }, ::identity) as WireFrame