summaryrefslogtreecommitdiffstats
path: root/hv-collector-ct
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f /hv-collector-ct
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt19
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt72
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt8
-rw-r--r--hv-collector-ct/src/test/resources/logback-test.xml2
4 files changed, 84 insertions, 17 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 1826bcd0..c4e9874f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -20,12 +20,15 @@
package org.onap.dcae.collectors.veshv.tests.component
import io.netty.buffer.ByteBuf
+import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.Exceptions
import reactor.core.publisher.Flux
import java.time.Duration
@@ -36,6 +39,7 @@ import java.time.Duration
internal class Sut {
val configurationProvider = FakeConfigurationProvider()
val sink = FakeSink()
+ val alloc = UnpooledByteBufAllocator.DEFAULT
private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
val collectorProvider = collectorFactory.createVesHvCollectorProvider()
@@ -43,8 +47,19 @@ internal class Sut {
get() = collectorProvider()
fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
-
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
return sink.sentMessages
}
+
+ fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> =
+ try {
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ Pair(sink.sentMessages, null)
+ } catch (ex: Exception) {
+ Pair(sink.sentMessages, ex)
+ }
+
+ companion object {
+ val logger = Logger(Sut::class)
+ }
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 26032ff9..fc4fb656 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -19,9 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -40,29 +43,76 @@ object VesHvSpecification : Spek({
.describedAs("should send all events")
.hasSize(2)
}
+ }
+
+ describe("Memory management") {
- system("should release memory for each incoming message") { sut ->
+ system("should release memory for each handled and dropped message") { sut ->
sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
- val msgWithInvalidPayload = invalidVesMessage()
val msgWithInvalidFrame = invalidWireFrame()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val refCntBeforeSending = msgWithInvalidDomain.refCnt()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
- sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception).isNull()
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidDomain.refCnt())
.describedAs("message with invalid domain should be released")
- .isEqualTo(refCntBeforeSending)
- assertThat(msgWithInvalidPayload.refCnt())
- .describedAs("message with invalid payload should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ system("should release memory for each message with invalid payload") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val msgWithInvalidPayload = invalidVesMessage()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithInvalidPayload)
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithInvalidPayload.refCnt())
+ .describedAs("message with invalid payload should be released")
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ system("should release memory for each message with garbage frame") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val msgWithGarbageFrame = garbageFrame()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithGarbageFrame)
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception?.cause)
+ .isInstanceOf(InvalidWireFrameMarkerException::class.java)
+
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithGarbageFrame.refCnt())
+ .describedAs("message with garbage frame should be released")
+ .isEqualTo(expectedRefCnt)
+
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
index b6342b11..998f3140 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
@@ -54,6 +54,10 @@ fun invalidVesMessage() = alocator.buffer().run {
}
+fun garbageFrame() = alocator.buffer().run {
+ writeBytes("the meaning of life is &@)(*_!".toByteArray())
+}
+
fun invalidWireFrame() = alocator.buffer().run {
writeByte(0xFF)
writeByte(1)
@@ -65,6 +69,7 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
.setVersion("1.0")
+ .setEventName("xyz")
.setEventId(id)
.setDomain(domain)
.setEventName("Sample event name")
@@ -76,6 +81,3 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
.setSequence(1))
.setHvRanMeasFields(ByteString.EMPTY)
.build()
-
-
-
diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml
index 809f62d4..84abc9d3 100644
--- a/hv-collector-ct/src/test/resources/logback-test.xml
+++ b/hv-collector-ct/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>