aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
blob: 61a9a3562186d1aecde9529bcb0b47f6a51ded52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
 * ============LICENSE_START=======================================================
 * dcaegen2-collectors-veshv
 * ================================================================================
 * Copyright (C) 2018-2019 NOKIA
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */
package org.onap.dcae.collectors.veshv.tests.component

import arrow.syntax.function.partially1
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.buffer.Unpooled
import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
import java.time.Duration
import java.util.*
import kotlin.system.measureTimeMillis

/**
 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 * @since May 2018
 */
object PerformanceSpecification : Spek({
    debugRx(false)

    describe("VES High Volume Collector performance") {
        it("should handle multiple clients in reasonable time") {
            val sink = CountingSink()
            val sut = Sut(sink)
            sut.configurationProvider.updateConfiguration(basicRouting)

            val numMessages: Long = 300_000
            val runs = 4
            val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())

            val params = VesEventParameters(
                    commonEventHeader = commonHeader(PERF3GPP),
                    messageType = VesEventType.VALID,
                    amount = numMessages
            )

            val fluxes = (1.rangeTo(runs)).map {
                sut.collector.handleConnection(generateDataStream(sut.alloc, params))
            }
            val durationMs = measureTimeMillis {
                Flux.merge(fluxes).then().block(timeout)
            }

            val durationSec = durationMs / 1000.0
            val throughput = sink.count / durationSec
            logger.info { "Processed $runs connections each containing $numMessages msgs." }
            logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
            assertThat(sink.count)
                    .describedAs("should send all events")
                    .isEqualTo(runs * numMessages)
        }

        it("should disconnect on transmission errors") {
            val sink = CountingSink()
            val sut = Sut(sink)
            sut.configurationProvider.updateConfiguration(basicRouting)

            val numMessages: Long = 100_000
            val timeout = Duration.ofSeconds(30)

            val params = VesEventParameters(
                    commonEventHeader = commonHeader(PERF3GPP),
                    messageType = VesEventType.VALID,
                    amount = numMessages
            )

            val dataStream = generateDataStream(sut.alloc, params)
                    .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
            sut.collector.handleConnection(dataStream)
                    .timeout(timeout)
                    .block()

            logger.info { "Forwarded ${sink.count} msgs" }
            assertThat(sink.count)
                    .describedAs("should send up to number of events")
                    .isLessThan(numMessages)
        }
    }

    describe("test infrastructure") {
        val digest = MessageDigest.getInstance("MD5")

        fun collectDigest(bb: ByteBuf) {
            bb.markReaderIndex()
            while (bb.isReadable) {
                digest.update(bb.readByte())
            }
            bb.resetReaderIndex()
        }

        fun calculateDigest(arrays: List<ByteArray>): ByteArray {
            for (array in arrays) {
                digest.update(array)
            }
            return digest.digest()
        }

        it("should yield same bytes as in the input") {
            val numberOfBuffers = 10
            val singleBufferSize = 1000
            val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
            val inputDigest = calculateDigest(arrays)

            val actualTotalSize = Flux.fromIterable(arrays)
                    .map { Unpooled.wrappedBuffer(it) }
                    .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
                    .doOnNext(::collectDigest)
                    .map {
                        val size = it.readableBytes()
                        it.release()
                        size
                    }
                    .sum()
                    .map(Long::toInt)
                    .block()

            val outputDigest = digest.digest()

            assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
            assertThat(outputDigest).isEqualTo(inputDigest)

        }
    }
})


private const val ONE_MILLION = 1_000_000.0
private val rand = Random()
private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)

private fun randomByteArray(size: Int): ByteArray {
    val bytes = ByteArray(size)
    rand.nextBytes(bytes)
    return bytes
}

fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
        stream.index()
                .filter { predicate(it.t1) }
                .map { it.t2 }

private fun generateDataStream(alloc: ByteBufAllocator, params: VesEventParameters): Flux<ByteBuf> =
        WireFrameEncoder(alloc).let { encoder ->
            generatorsFactory.createVesEventGenerator()
                    .createMessageFlux(params)
                    .map { WireFrameMessage(it.toByteArray()) }
                    .map(encoder::encode)
                    .transform { simulateRemoteTcp(alloc, 1000, it) }
        }

private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
        byteBuffers
                .bufferTimeout(maxSize, Duration.ofMillis(250))
                .map { joinBuffers(alloc, it) }
                .concatMap { randomlySplitTcpFrames(it) }

private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
        alloc.compositeBuffer().addComponents(true, it)

private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
    val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
    return Flux.create<ByteBuf> { sink ->
        while (bb.isReadable) {
            val frameSize = Math.min(targetFrameSize, bb.readableBytes())
            sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
            bb.readerIndex(bb.readerIndex() + frameSize)
        }
        bb.release()
        sink.complete()
    }
}