summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt203
1 files changed, 203 insertions, 0 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
new file mode 100644
index 00000000..0897e910
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -0,0 +1,203 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import arrow.syntax.function.partially1
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import reactor.core.publisher.Flux
+import reactor.math.sum
+import java.security.MessageDigest
+import java.time.Duration
+import java.util.*
+import kotlin.system.measureTimeMillis
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object PerformanceSpecification : Spek({
+ debugRx(false)
+
+ describe("VES High Volume Collector performance") {
+ it("should handle multiple clients in reasonable time") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 300_000
+ val runs = 4
+ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
+
+ val params = MessageParameters(
+ commonEventHeader = commonHeader(PERF3GPP),
+ messageType = VALID,
+ amount = numMessages
+ )
+
+ val fluxes = (1.rangeTo(runs)).map {
+ sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ }
+ val durationMs = measureTimeMillis {
+ Flux.merge(fluxes).then().block(timeout)
+ }
+
+ val durationSec = durationMs / 1000.0
+ val throughput = sink.count / durationSec
+ logger.info("Processed $runs connections each containing $numMessages msgs.")
+ logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ assertThat(sink.count)
+ .describedAs("should send all events")
+ .isEqualTo(runs * numMessages)
+ }
+
+ it("should disconnect on transmission errors") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 100_000
+ val timeout = Duration.ofSeconds(30)
+
+ val params = MessageParameters(
+ commonEventHeader = commonHeader(PERF3GPP),
+ messageType = VALID,
+ amount = numMessages
+ )
+
+ val dataStream = generateDataStream(sut.alloc, params)
+ .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
+ sut.collector.handleConnection(sut.alloc, dataStream)
+ .timeout(timeout)
+ .block()
+
+ logger.info("Forwarded ${sink.count} msgs")
+ assertThat(sink.count)
+ .describedAs("should send up to number of events")
+ .isLessThan(numMessages)
+ }
+ }
+
+ describe("test infrastructure") {
+ val digest = MessageDigest.getInstance("MD5")
+
+ fun collectDigest(bb: ByteBuf) {
+ bb.markReaderIndex()
+ while (bb.isReadable) {
+ digest.update(bb.readByte())
+ }
+ bb.resetReaderIndex()
+ }
+
+ fun calculateDigest(arrays: List<ByteArray>): ByteArray {
+ for (array in arrays) {
+ digest.update(array)
+ }
+ return digest.digest()
+ }
+
+ it("should yield same bytes as in the input") {
+ val numberOfBuffers = 10
+ val singleBufferSize = 1000
+ val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
+ val inputDigest = calculateDigest(arrays)
+
+ val actualTotalSize = Flux.fromIterable(arrays)
+ .map { Unpooled.wrappedBuffer(it) }
+ .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
+ .doOnNext(::collectDigest)
+ .map {
+ val size = it.readableBytes()
+ it.release()
+ size
+ }
+ .sum()
+ .map(Long::toInt)
+ .block()
+
+ val outputDigest = digest.digest()
+
+ assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
+ assertThat(outputDigest).isEqualTo(inputDigest)
+
+ }
+ }
+})
+
+
+private const val ONE_MILION = 1_000_000.0
+
+private val rand = Random()
+private fun randomByteArray(size: Int): ByteArray {
+ val bytes = ByteArray(size)
+ rand.nextBytes(bytes)
+ return bytes
+}
+
+fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
+ stream.index()
+ .filter { predicate(it.t1) }
+ .map { it.t2 }
+
+private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
+ WireFrameEncoder(alloc).let { encoder ->
+ MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
+ .createMessageFlux(listOf(params))
+ .map(encoder::encode)
+ .transform { simulateRemoteTcp(alloc, 1000, it) }
+ }
+
+private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
+ byteBuffers
+ .bufferTimeout(maxSize, Duration.ofMillis(250))
+ .map { joinBuffers(alloc, it) }
+ .concatMap { randomlySplitTcpFrames(it) }
+
+private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
+ alloc.compositeBuffer().addComponents(true, it)
+
+private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
+ val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
+ return Flux.create<ByteBuf> { sink ->
+ while (bb.isReadable) {
+ val frameSize = Math.min(targetFrameSize, bb.readableBytes())
+ sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
+ bb.readerIndex(bb.readerIndex() + frameSize)
+ }
+ bb.release()
+ sink.complete()
+ }
+}
+