summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-04 15:20:14 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-15 15:09:48 +0100
commitdf17f466577b97a12fac39b64b5d113f32b82f2e (patch)
tree0a8999e593c90f97ed1b4f45b6e8adbbc110a787 /sources/hv-collector-ct
parente7204cbcf6af61856330cffc541b6f5c78476a09 (diff)
Generate VesEvents in hv-ves/message-generator
- Split message generator on two specialized generators for VesEvent and WireFrame related message types - Refactor whole message-generator module Change-Id: I1266b549a9a4d27213d03e8921298deab2dacb59 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1162
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt24
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt1
2 files changed, 15 insertions, 10 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index ef4ce967..dc5fe60b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -31,11 +31,13 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import reactor.core.publisher.Flux
import reactor.math.sum
@@ -61,9 +63,9 @@ object PerformanceSpecification : Spek({
val runs = 4
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
- val params = MessageParameters(
+ val params = VesEventParameters(
commonEventHeader = commonHeader(PERF3GPP),
- messageType = VALID,
+ messageType = VesEventType.VALID,
amount = numMessages
)
@@ -91,9 +93,9 @@ object PerformanceSpecification : Spek({
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
- val params = MessageParameters(
+ val params = VesEventParameters(
commonEventHeader = commonHeader(PERF3GPP),
- messageType = VALID,
+ messageType = VesEventType.VALID,
amount = numMessages
)
@@ -158,8 +160,9 @@ object PerformanceSpecification : Spek({
private const val ONE_MILION = 1_000_000.0
-
private val rand = Random()
+private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
+
private fun randomByteArray(size: Int): ByteArray {
val bytes = ByteArray(size)
rand.nextBytes(bytes)
@@ -171,10 +174,11 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
.filter { predicate(it.t1) }
.map { it.t2 }
-private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
+private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
WireFrameEncoder(alloc).let { encoder ->
- MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
- .createMessageFlux(listOf(params))
+ generatorsFactory.createVesEventGenerator()
+ .createMessageFlux(params)
+ .map { WireFrameMessage(it.toByteArray()) }
.map(encoder::encode)
.transform { simulateRemoteTcp(alloc, 1000, it) }
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 30661e84..ed79e3e2 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -69,6 +69,7 @@ class Sut(sink: Sink = StoringSink()): AutoCloseable {
}
}
+
class DummySinkProvider(private val sink: Sink) : SinkProvider {
private val active = AtomicBoolean(true)