diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-28 14:42:05 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 11:04:01 +0200 |
commit | 03702b48989174dc8afa855e663a28e34b4da67b (patch) | |
tree | e89a4930aab1a53014b81e76be493dda41d9e007 /hv-collector-core | |
parent | cf1465f37d20391a921df449d5dd01454f64910c (diff) |
Use Either instead of exceptions in frame decoder
Goals:
* Make code cleaner (in a FP way)
* Avoid costly exception throw each time we wait for the rest of the
frame (collecting stack traces is costly and we do not need them
anyway)
Closes ONAP-437
Change-Id: I40341d3c2cb85f3ff581d89167245cb009dbb070
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
3 files changed, 57 insertions, 25 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index d1d72592..502505c4 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,14 +19,17 @@ */ package org.onap.dcae.collectors.veshv.impl.wire +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame +import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux +import reactor.core.publisher.SynchronousSink /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -53,28 +56,29 @@ internal class WireChunkDecoder( } private fun generateFrames(): Flux<WireFrame> = Flux.generate { next -> - try { - val frame = decodeFirstFrameFromBuffer() - if (frame == null) { + decoder.decodeFirst(streamBuffer) + .fold(onError(next), onSuccess(next)) + .unsafeRunSync() + } + + private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err -> + when (err) { + is InvalidWireFrame -> IO { + next.error(WireFrameException(err)) + } + is MissingWireFrameBytes -> IO { logEndOfData() next.complete() - } else { - logDecodedWireMessage(frame) - next.next(frame) } - } catch (ex: Exception) { - next.error(ex) } } - - private fun decodeFirstFrameFromBuffer(): WireFrame? = - try { - decoder.decodeFirst(streamBuffer) - } catch (ex: MissingWireFrameBytesException) { - logger.trace { "${ex.message} - waiting for more data" } - null - } + private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame -> + IO { + logDecodedWireMessage(frame) + next.next(frame) + } + } private fun logIncomingMessage(wire: ByteBuf) { @@ -90,6 +94,6 @@ internal class WireChunkDecoder( } companion object { - val logger = Logger(VesHvCollector::class) + val logger = Logger(WireChunkDecoder::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt new file mode 100644 index 00000000..83a7cd85 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameException(error: WireFrameDecodingError) + : Exception("${error::class.simpleName}: ${error.message}") diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 1ddcc3dc..33f71684 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,7 +30,6 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import reactor.test.test /** @@ -43,11 +42,11 @@ internal object WireChunkDecoderTest : Spek({ val anotherPayload = "ala ma kota a kot ma ale".toByteArray() val encoder = WireFrameEncoder(alloc) - + fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) - + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { assertThat(bb.refCnt()) @@ -90,7 +89,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield error") { createInstance().decode(input).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -132,7 +131,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frame and error") { createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -170,7 +169,7 @@ internal object WireChunkDecoderTest : Spek({ .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should release memory for 1st input") { |