diff options
Diffstat (limited to 'hv-collector-core/src')
3 files changed, 57 insertions, 25 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index d1d72592..502505c4 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -19,14 +19,17 @@ */ package org.onap.dcae.collectors.veshv.impl.wire +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame +import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException -import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux +import reactor.core.publisher.SynchronousSink /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -53,28 +56,29 @@ internal class WireChunkDecoder( } private fun generateFrames(): Flux<WireFrame> = Flux.generate { next -> - try { - val frame = decodeFirstFrameFromBuffer() - if (frame == null) { + decoder.decodeFirst(streamBuffer) + .fold(onError(next), onSuccess(next)) + .unsafeRunSync() + } + + private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err -> + when (err) { + is InvalidWireFrame -> IO { + next.error(WireFrameException(err)) + } + is MissingWireFrameBytes -> IO { logEndOfData() next.complete() - } else { - logDecodedWireMessage(frame) - next.next(frame) } - } catch (ex: Exception) { - next.error(ex) } } - - private fun decodeFirstFrameFromBuffer(): WireFrame? = - try { - decoder.decodeFirst(streamBuffer) - } catch (ex: MissingWireFrameBytesException) { - logger.trace { "${ex.message} - waiting for more data" } - null - } + private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame -> + IO { + logDecodedWireMessage(frame) + next.next(frame) + } + } private fun logIncomingMessage(wire: ByteBuf) { @@ -90,6 +94,6 @@ internal class WireChunkDecoder( } companion object { - val logger = Logger(VesHvCollector::class) + val logger = Logger(WireChunkDecoder::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt new file mode 100644 index 00000000..83a7cd85 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameException(error: WireFrameDecodingError) + : Exception("${error::class.simpleName}: ${error.message}") diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 1ddcc3dc..33f71684 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,7 +30,6 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import reactor.test.test /** @@ -43,11 +42,11 @@ internal object WireChunkDecoderTest : Spek({ val anotherPayload = "ala ma kota a kot ma ale".toByteArray() val encoder = WireFrameEncoder(alloc) - + fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) - + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { assertThat(bb.refCnt()) @@ -90,7 +89,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield error") { createInstance().decode(input).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -132,7 +131,7 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frame and error") { createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should leave memory unreleased") { @@ -170,7 +169,7 @@ internal object WireChunkDecoderTest : Spek({ .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .verifyError(InvalidWireFrameMarkerException::class.java) + .verifyError(WireFrameException::class.java) } it("should release memory for 1st input") { |