diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-07 11:52:16 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 13:06:43 +0200 |
commit | 07bbbf71cd65b29f446a1b475add87f20365db83 (patch) | |
tree | e64fcf12c21e46358043744476d68765634d7f6f /hv-collector-ct | |
parent | 767d0464a19e0949d2919e6df15c9653dec50503 (diff) |
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer
snapshots - not separate messages. That means that we need to join
incomming buffers and then split it into separate WireFrames.
Closes ONAP-312
Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct')
4 files changed, 84 insertions, 17 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 1826bcd0..c4e9874f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,12 +20,15 @@ package org.onap.dcae.collectors.veshv.tests.component import io.netty.buffer.ByteBuf +import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.Exceptions import reactor.core.publisher.Flux import java.time.Duration @@ -36,6 +39,7 @@ import java.time.Duration internal class Sut { val configurationProvider = FakeConfigurationProvider() val sink = FakeSink() + val alloc = UnpooledByteBufAllocator.DEFAULT private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) val collectorProvider = collectorFactory.createVesHvCollectorProvider() @@ -43,8 +47,19 @@ internal class Sut { get() = collectorProvider() fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } + + fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> = + try { + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + Pair(sink.sentMessages, null) + } catch (ex: Exception) { + Pair(sink.sentMessages, ex) + } + + companion object { + val logger = Logger(Sut::class) + } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 26032ff9..fc4fb656 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -19,9 +19,12 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -40,29 +43,76 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } + } + + describe("Memory management") { - system("should release memory for each incoming message") { sut -> + system("should release memory for each handled and dropped message") { sut -> sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) - val msgWithInvalidPayload = invalidVesMessage() val msgWithInvalidFrame = invalidWireFrame() - val validMessage = vesMessage(Domain.HVRANMEAS) - val refCntBeforeSending = msgWithInvalidDomain.refCnt() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithInvalidDomain, msgWithInvalidFrame) - sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) + assertThat(handledEvents).hasSize(1) + assertThat(exception).isNull() + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidDomain.refCnt()) .describedAs("message with invalid domain should be released") - .isEqualTo(refCntBeforeSending) - assertThat(msgWithInvalidPayload.refCnt()) - .describedAs("message with invalid payload should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) + + } + + system("should release memory for each message with invalid payload") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) + val msgWithInvalidPayload = invalidVesMessage() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithInvalidPayload) + + assertThat(handledEvents).hasSize(1) + assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(expectedRefCnt) + + } + + system("should release memory for each message with garbage frame") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) + val msgWithGarbageFrame = garbageFrame() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithGarbageFrame) + + assertThat(handledEvents).hasSize(1) + assertThat(exception?.cause) + .isInstanceOf(InvalidWireFrameMarkerException::class.java) + assertThat(validMessage.refCnt()) .describedAs("handled message should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) + assertThat(msgWithGarbageFrame.refCnt()) + .describedAs("message with garbage frame should be released") + .isEqualTo(expectedRefCnt) + } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt index b6342b11..998f3140 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -54,6 +54,10 @@ fun invalidVesMessage() = alocator.buffer().run { } +fun garbageFrame() = alocator.buffer().run { + writeBytes("the meaning of life is &@)(*_!".toByteArray()) +} + fun invalidWireFrame() = alocator.buffer().run { writeByte(0xFF) writeByte(1) @@ -65,6 +69,7 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() .setVersion("1.0") + .setEventName("xyz") .setEventId(id) .setDomain(domain) .setEventName("Sample event name") @@ -76,6 +81,3 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr .setSequence(1)) .setHvRanMeasFields(ByteString.EMPTY) .build() - - - diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml index 809f62d4..84abc9d3 100644 --- a/hv-collector-ct/src/test/resources/logback-test.xml +++ b/hv-collector-ct/src/test/resources/logback-test.xml @@ -26,7 +26,7 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> |