diff options
12 files changed, 126 insertions, 120 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index d1d72592..502505c4 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,14 +19,17 @@ */ package org.onap.dcae.collectors.veshv.impl.wire +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame +import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux +import reactor.core.publisher.SynchronousSink /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -53,28 +56,29 @@ internal class WireChunkDecoder( } private fun generateFrames(): Flux<WireFrame> = Flux.generate { next -> - try { - val frame = decodeFirstFrameFromBuffer() - if (frame == null) { + decoder.decodeFirst(streamBuffer) + .fold(onError(next), onSuccess(next)) + .unsafeRunSync() + } + + private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err -> + when (err) { + is InvalidWireFrame -> IO { + next.error(WireFrameException(err)) + } + is MissingWireFrameBytes -> IO { logEndOfData() next.complete() - } else { - logDecodedWireMessage(frame) - next.next(frame) } - } catch (ex: Exception) { - next.error(ex) } } - - private fun decodeFirstFrameFromBuffer(): WireFrame? = - try { - decoder.decodeFirst(streamBuffer) - } catch (ex: MissingWireFrameBytesException) { - logger.trace { "${ex.message} - waiting for more data" } - null - } + private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame -> + IO { + logDecodedWireMessage(frame) + next.next(frame) + } + } private fun logIncomingMessage(wire: ByteBuf) { @@ -90,6 +94,6 @@ internal class WireChunkDecoder( } companion object { - val logger = Logger(VesHvCollector::class) + val logger = Logger(WireChunkDecoder::class) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt index 6e1ce935..83a7cd85 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt @@ -17,10 +17,13 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.domain.exceptions +package org.onap.dcae.collectors.veshv.impl.wire + +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)") +class WireFrameException(error: WireFrameDecodingError) + : Exception("${error::class.simpleName}: ${error.message}") diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 1ddcc3dc..33f71684 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,7 +30,6 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import reactor.test.test /** @@ -43,11 +42,11 @@ internal object WireChunkDecoderTest : Spek({ val anotherPayload = "ala ma kota a kot ma ale".toByteArray() val encoder = WireFrameEncoder(alloc) - + fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) - + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { assertThat(bb.refCnt()) @@ -90,7 +89,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield error") { createInstance().decode(input).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -132,7 +131,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frame and error") { createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -170,7 +169,7 @@ internal object WireChunkDecoderTest : Spek({ .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should release memory for 1st input") { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index c68f0514..00739fa4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -41,12 +41,13 @@ import java.time.Duration import java.util.* import kotlin.system.measureTimeMillis - /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ object PerformanceSpecification : Spek({ + debugRx(false) + describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() @@ -69,8 +70,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - println("Processed $runs connections each containing $numMessages msgs.") - println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info("Processed $runs connections each containing $numMessages msgs.") + logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -94,7 +95,7 @@ object PerformanceSpecification : Spek({ .timeout(timeout) .block() - println("Forwarded ${sink.count} msgs") + logger.info("Forwarded ${sink.count} msgs") assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 08450598..49eeddaa 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -19,12 +19,10 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -35,6 +33,8 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain * @since May 2018 */ object VesHvSpecification : Spek({ + debugRx(false) + describe("VES High Volume Collector") { it("should handle multiple HV RAN events") { val sink = StoringSink() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt index 8895d642..8895d642 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt index ff452a7a..29df8c70 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt @@ -17,13 +17,22 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.domain.exceptions +package org.onap.dcae.collectors.veshv.tests.component -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.jetbrains.spek.api.dsl.SpecBody +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Hooks -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException( - "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) +fun SpecBody.debugRx(debug: Boolean = true) { + if (debug) { + beforeGroup { + Hooks.onOperatorDebug() + } + + afterGroup { + Hooks.resetOnOperatorDebug() + } + } +} + +val logger = Logger("org.onap.dcae.collectors.veshv.tests.component") diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml index c11510ac..85c2a45a 100644 --- a/hv-collector-domain/pom.xml +++ b/hv-collector-domain/pom.xml @@ -103,6 +103,10 @@ <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-core</artifactId> + </dependency> <dependency> <groupId>org.assertj</groupId> diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 3cd9b19a..22767ed3 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.domain +import arrow.core.Either +import arrow.core.Left +import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -50,49 +50,37 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) { */ class WireFrameDecoder { - fun decodeFirst(byteBuf: ByteBuf): WireFrame { - verifyNotEmpty(byteBuf) - byteBuf.markReaderIndex() - - verifyMarker(byteBuf) - verifyMinimumSize(byteBuf) - - val version = byteBuf.readUnsignedByte() - val payloadTypeRaw = byteBuf.readUnsignedByte() - val payloadSize = verifyPayloadSize(byteBuf) - val payload = ByteData.readFrom(byteBuf, payloadSize) + fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> = + when { + isEmpty(byteBuf) -> Left(EmptyWireFrame) + headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) + else -> parseFrame(byteBuf) + } - return WireFrame(payload, version, payloadTypeRaw, payloadSize) - } + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE - private fun verifyPayloadSize(byteBuf: ByteBuf): Int = - byteBuf.readInt().let { payloadSize -> - if (byteBuf.readableBytes() < payloadSize) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < payload size") - } else { - payloadSize - } - } + private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun verifyMinimumSize(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < header size") - } - } + private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> { + byteBuf.markReaderIndex() - private fun verifyMarker(byteBuf: ByteBuf) { val mark = byteBuf.readUnsignedByte() if (mark != WireFrame.MARKER_BYTE) { byteBuf.resetReaderIndex() - throw InvalidWireFrameMarkerException(mark) + return Left(InvalidWireFrameMarker(mark)) } - } - private fun verifyNotEmpty(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < 1) { - throw EmptyWireFrameException() + val version = byteBuf.readUnsignedByte() + val payloadTypeRaw = byteBuf.readUnsignedByte() + + val payloadSize = byteBuf.readInt() + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + return Left(MissingWireFramePayloadBytes) } + + val payload = ByteData.readFrom(byteBuf, payloadSize) + + return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index 7e4b3cef..fb225202 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -17,10 +17,29 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.domain.exceptions +package org.onap.dcae.collectors.veshv.domain /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg) + +sealed class WireFrameDecodingError(val message: String) + + +// Invalid frame errors + +sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) + +class InvalidWireFrameMarker(actualMarker: Short) + : InvalidWireFrame( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) + + +// Missing bytes errors + +sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) + +object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") +object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") +object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt deleted file mode 100644 index 11013834..00000000 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.domain.exceptions - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -open class WireFrameDecodingException(msg: String) : Exception(msg) diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 9694caf7..a97d889c 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -19,16 +19,17 @@ */ package org.onap.dcae.collectors.veshv.domain +import arrow.core.Either +import arrow.core.identity import io.netty.buffer.Unpooled import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.assertj.core.api.Assertions.fail +import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException import java.nio.charset.Charset /** @@ -119,7 +120,7 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded) + val decoded = decoder.decodeFirst(encoded).getOrFail() it("should decode version") { assertThat(decoded.version).isEqualTo(frame.version) @@ -146,7 +147,7 @@ object WireFrameCodecsTest : Spek({ val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff) + val decoded = decoder.decodeFirst(buff).getOrFail() assertThat(decoded.isValid()).describedAs("should be valid").isTrue() assertThat(buff.readableBytes()).isEqualTo(1) @@ -156,8 +157,8 @@ object WireFrameCodecsTest : Spek({ val buff = Unpooled.buffer() .writeByte(0xFF) - assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { decoder.decodeFirst(buff) } + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } + } it("should throw exception when first byte is not 0xFF but length looks ok") { @@ -165,16 +166,14 @@ object WireFrameCodecsTest : Spek({ .writeByte(0xAA) .writeBytes("some garbage".toByteArray()) - assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { decoder.decodeFirst(buff) } + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) } } it("should throw exception when first byte is not 0xFF and length is to short") { val buff = Unpooled.buffer() .writeByte(0xAA) - assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { decoder.decodeFirst(buff) } + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) } } it("should throw exception when payload doesn't fit") { @@ -182,11 +181,17 @@ object WireFrameCodecsTest : Spek({ .writeBytes(encodeSampleFrame()) buff.writerIndex(buff.writerIndex() - 2) - assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { decoder.decodeFirst(buff) } + decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) } } } } -})
\ No newline at end of file +}) + +private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) { + fold({ assertj(assertThat(it)) }, { fail("Error expected") }) +} + +private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame = + fold({ fail(it.message) }, ::identity) as WireFrame |