diff options
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt')
-rw-r--r-- | sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt new file mode 100644 index 00000000..0897e910 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -0,0 +1,203 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import arrow.syntax.function.partially1 +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import reactor.core.publisher.Flux +import reactor.math.sum +import java.security.MessageDigest +import java.time.Duration +import java.util.* +import kotlin.system.measureTimeMillis + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object PerformanceSpecification : Spek({ + debugRx(false) + + describe("VES High Volume Collector performance") { + it("should handle multiple clients in reasonable time") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 300_000 + val runs = 4 + val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) + + val params = MessageParameters( + commonEventHeader = commonHeader(PERF3GPP), + messageType = VALID, + amount = numMessages + ) + + val fluxes = (1.rangeTo(runs)).map { + sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + } + val durationMs = measureTimeMillis { + Flux.merge(fluxes).then().block(timeout) + } + + val durationSec = durationMs / 1000.0 + val throughput = sink.count / durationSec + logger.info("Processed $runs connections each containing $numMessages msgs.") + logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + assertThat(sink.count) + .describedAs("should send all events") + .isEqualTo(runs * numMessages) + } + + it("should disconnect on transmission errors") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 100_000 + val timeout = Duration.ofSeconds(30) + + val params = MessageParameters( + commonEventHeader = commonHeader(PERF3GPP), + messageType = VALID, + amount = numMessages + ) + + val dataStream = generateDataStream(sut.alloc, params) + .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) + sut.collector.handleConnection(sut.alloc, dataStream) + .timeout(timeout) + .block() + + logger.info("Forwarded ${sink.count} msgs") + assertThat(sink.count) + .describedAs("should send up to number of events") + .isLessThan(numMessages) + } + } + + describe("test infrastructure") { + val digest = MessageDigest.getInstance("MD5") + + fun collectDigest(bb: ByteBuf) { + bb.markReaderIndex() + while (bb.isReadable) { + digest.update(bb.readByte()) + } + bb.resetReaderIndex() + } + + fun calculateDigest(arrays: List<ByteArray>): ByteArray { + for (array in arrays) { + digest.update(array) + } + return digest.digest() + } + + it("should yield same bytes as in the input") { + val numberOfBuffers = 10 + val singleBufferSize = 1000 + val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) } + val inputDigest = calculateDigest(arrays) + + val actualTotalSize = Flux.fromIterable(arrays) + .map { Unpooled.wrappedBuffer(it) } + .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) } + .doOnNext(::collectDigest) + .map { + val size = it.readableBytes() + it.release() + size + } + .sum() + .map(Long::toInt) + .block() + + val outputDigest = digest.digest() + + assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(outputDigest).isEqualTo(inputDigest) + + } + } +}) + + +private const val ONE_MILION = 1_000_000.0 + +private val rand = Random() +private fun randomByteArray(size: Int): ByteArray { + val bytes = ByteArray(size) + rand.nextBytes(bytes) + return bytes +} + +fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> = + stream.index() + .filter { predicate(it.t1) } + .map { it.t2 } + +private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = + WireFrameEncoder(alloc).let { encoder -> + MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES) + .createMessageFlux(listOf(params)) + .map(encoder::encode) + .transform { simulateRemoteTcp(alloc, 1000, it) } + } + +private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) = + byteBuffers + .bufferTimeout(maxSize, Duration.ofMillis(250)) + .map { joinBuffers(alloc, it) } + .concatMap { randomlySplitTcpFrames(it) } + +private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) = + alloc.compositeBuffer().addComponents(true, it) + +private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { + val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt()) + return Flux.create<ByteBuf> { sink -> + while (bb.isReadable) { + val frameSize = Math.min(targetFrameSize, bb.readableBytes()) + sink.next(bb.retainedSlice(bb.readerIndex(), frameSize)) + bb.readerIndex(bb.readerIndex() + frameSize) + } + bb.release() + sink.complete() + } +} + |