diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-27 12:30:56 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 10:39:41 +0200 |
commit | 678af1b5172eb3b214584de91ece3f8df163c5e9 (patch) | |
tree | 984c0cd15158183c3d038a08163737cd5e34a91b /hv-collector-ct/src/test/kotlin | |
parent | 553154ae42e5362dacab6c190b8cf1e1388f5b87 (diff) |
Write performance tests
Closes ONAP-434
Change-Id: I1139848f32ac19a4d0a0fd595f4b07c10cd83db0
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-ct/src/test/kotlin')
6 files changed, 249 insertions, 75 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt new file mode 100644 index 00000000..c68f0514 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -0,0 +1,193 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import arrow.syntax.function.partially1 +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator +import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import reactor.core.publisher.Flux +import reactor.math.sum +import java.security.MessageDigest +import java.time.Duration +import java.util.* +import kotlin.system.measureTimeMillis + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object PerformanceSpecification : Spek({ + describe("VES High Volume Collector performance") { + it("should handle multiple clients in reasonable time") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 300_000 + val runs = 4 + val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) + + val params = MessageParameters( + commonEventHeader = vesEvent().commonEventHeader, + amount = numMessages) + val fluxes = (1.rangeTo(runs)).map { + sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + } + val durationMs = measureTimeMillis { + Flux.merge(fluxes).then().block(timeout) + } + + val durationSec = durationMs / 1000.0 + val throughput = sink.count / durationSec + println("Processed $runs connections each containing $numMessages msgs.") + println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + assertThat(sink.count) + .describedAs("should send all events") + .isEqualTo(runs * numMessages) + } + + it("should disconnect on transmission errors") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 100_000 + val timeout = Duration.ofSeconds(30) + + val params = MessageParameters( + commonEventHeader = vesEvent().commonEventHeader, + amount = numMessages) + + val dataStream = generateDataStream(sut.alloc, params) + .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) + sut.collector.handleConnection(sut.alloc, dataStream) + .timeout(timeout) + .block() + + println("Forwarded ${sink.count} msgs") + assertThat(sink.count) + .describedAs("should send up to number of events") + .isLessThan(numMessages) + } + } + + describe("test infrastructure") { + val digest = MessageDigest.getInstance("MD5") + + fun collectDigest(bb: ByteBuf) { + bb.markReaderIndex() + while (bb.isReadable) { + digest.update(bb.readByte()) + } + bb.resetReaderIndex() + } + + fun calculateDigest(arrays: List<ByteArray>): ByteArray { + for (array in arrays) { + digest.update(array) + } + return digest.digest() + } + + it("should yield same bytes as in the input") { + val numberOfBuffers = 10 + val singleBufferSize = 1000 + val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) } + val inputDigest = calculateDigest(arrays) + + val actualTotalSize = Flux.fromIterable(arrays) + .map { Unpooled.wrappedBuffer(it) } + .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) } + .doOnNext(::collectDigest) + .map { + val size = it.readableBytes() + it.release() + size + } + .sum() + .map(Long::toInt) + .block() + + val outputDigest = digest.digest() + + assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(outputDigest).isEqualTo(inputDigest) + + } + } +}) + + +private const val ONE_MILION = 1_000_000.0 + +private val rand = Random() +private fun randomByteArray(size: Int): ByteArray { + val bytes = ByteArray(size) + rand.nextBytes(bytes) + return bytes +} + +fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> = + stream.index() + .filter { predicate(it.t1) } + .map { it.t2 } + +private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = + WireFrameEncoder(alloc).let { encoder -> + createMessageGenerator() + .createMessageFlux(params) + .map(encoder::encode) + .transform { simulateRemoteTcp(alloc, 1000, it) } + } + +private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) = + byteBuffers + .bufferTimeout(maxSize, Duration.ofMillis(250)) + .map { joinBuffers(alloc, it) } + .concatMap { randomlySplitTcpFrames(it) } + +private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) = + alloc.compositeBuffer().addComponents(true, it) + +private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { + val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt()) + return Flux.create<ByteBuf> { sink -> + while (bb.isReadable) { + val frameSize = Math.min(targetFrameSize, bb.readableBytes()) + sink.next(bb.retainedSlice(bb.readerIndex(), frameSize)) + bb.readerIndex(bb.readerIndex() + frameSize) + } + bb.release() + sink.complete() + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 5099ae4c..44b3266e 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -22,14 +22,14 @@ package org.onap.dcae.collectors.veshv.tests.component import io.netty.buffer.ByteBuf import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics -import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.Exceptions import reactor.core.publisher.Flux import java.time.Duration @@ -37,9 +37,9 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class Sut { +class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() - val sink = FakeSink() + val alloc = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) @@ -47,21 +47,9 @@ internal class Sut { val collector: Collector get() = collectorProvider() +} - fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - return sink.sentMessages - } - - fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> = - try { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - Pair(sink.sentMessages, null) - } catch (ex: Exception) { - Pair(sink.sentMessages, ex) - } - - companion object { - val logger = Logger(Sut::class) - } +fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + return sink.sentMessages } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 08b6382d..08450598 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,8 +23,10 @@ import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -34,9 +36,11 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain */ object VesHvSpecification : Spek({ describe("VES High Volume Collector") { - system("should handle multiple HV RAN events") { sut -> + it("should handle multiple HV RAN events") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) assertThat(messages) .describedAs("should send all events") @@ -46,18 +50,18 @@ object VesHvSpecification : Spek({ describe("Memory management") { - system("should release memory for each handled and dropped message") { sut -> + it("should release memory for each handled and dropped message") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithInvalidDomain, msgWithInvalidFrame) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame) assertThat(handledEvents).hasSize(1) - assertThat(exception).isNull() assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -71,17 +75,17 @@ object VesHvSpecification : Spek({ } - system("should release memory for each message with invalid payload") { sut -> + it("should release memory for each message with invalid payload") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidPayload = invalidVesMessage() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithInvalidPayload) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) assertThat(handledEvents).hasSize(1) - assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -92,18 +96,17 @@ object VesHvSpecification : Spek({ } - system("should release memory for each message with garbage frame") { sut -> + it("should release memory for each message with garbage frame") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithGarbageFrame) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame) assertThat(handledEvents).hasSize(1) - assertThat(exception?.cause) - .isInstanceOf(InvalidWireFrameMarkerException::class.java) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -116,10 +119,12 @@ object VesHvSpecification : Spek({ } describe("message routing") { - system("should direct message to a topic by means of routing configuration") { sut -> + it("should direct message to a topic by means of routing configuration") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] @@ -127,9 +132,11 @@ object VesHvSpecification : Spek({ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } - system("should drop message if route was not found") { sut -> + it("should drop message if route was not found") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection( + val messages = sut.handleConnection(sink, vesMessage(Domain.OTHER, "first"), vesMessage(Domain.HVRANMEAS, "second"), vesMessage(Domain.HEARTBEAT, "third")) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt deleted file mode 100644 index 9ede62a3..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import org.jetbrains.spek.api.dsl.Pending -import org.jetbrains.spek.api.dsl.TestContainer - -internal fun TestContainer.system(description: String, body: (Sut) -> Unit) { - test("system $description", body = { body(Sut()) }) -} - -internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) { - test("system $description", Pending.Yes(reason), body = { body(Sut()) }) -} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt index 3314c44f..8895d642 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -25,7 +25,7 @@ import io.netty.buffer.PooledByteBufAllocator import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import java.util.UUID +import java.util.* val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT @@ -61,7 +61,7 @@ fun invalidWireFrame() = allocator.buffer().run { writeByte(0x01) // content type = GPB } -fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = +fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) = VesEvent.newBuilder() .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index b0dbd0f5..a5fd546a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -21,16 +21,16 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class FakeSink : Sink { +class StoringSink : Sink { private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() val sentMessages: List<RoutedMessage> @@ -40,3 +40,20 @@ class FakeSink : Sink { return messages.doOnNext(sent::addLast) } } + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class CountingSink : Sink { + private val atomicCount = AtomicLong(0) + + val count: Long + get() = atomicCount.get() + + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + return messages.doOnNext { + atomicCount.incrementAndGet() + } + } +} |