diff options
20 files changed, 401 insertions, 98 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt index 9ede62a3..f4c92fd4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt @@ -17,15 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.tests.component +package org.onap.dcae.collectors.veshv.simulators.xnf.api -import org.jetbrains.spek.api.dsl.Pending -import org.jetbrains.spek.api.dsl.TestContainer +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import reactor.core.publisher.Flux -internal fun TestContainer.system(description: String, body: (Sut) -> Unit) { - test("system $description", body = { body(Sut()) }) -} - -internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) { - test("system $description", Pending.Yes(reason), body = { body(Sut()) }) +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +interface MessageGenerator { + fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt index ed96e6c3..657ed317 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt @@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -data class ClientConfiguration( +internal data class ClientConfiguration( val vesHost: String, val vesPort: Int, val security: SecurityConfiguration, diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt new file mode 100644 index 00000000..dce386b1 --- /dev/null +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt @@ -0,0 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf + +import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageGeneratorImpl +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.PayloadGenerator + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +fun createMessageGenerator(): MessageGenerator = MessageGeneratorImpl(PayloadGenerator()) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index 3f872b51..c545ac8d 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -37,7 +37,7 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class HttpServer(private val vesClient: VesHvClient) { +internal class HttpServer(private val vesClient: VesHvClient) { fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO { RatpackServer.start { server -> @@ -69,7 +69,7 @@ class HttpServer(private val vesClient: VesHvClient) { return ctx.request.body .map { Json.createReader(it.inputStream).readObject() } .map { extractMessageParameters(it) } - .map { MessageFactory.INSTANCE.createMessageFlux(it) } + .map { MessageGeneratorImpl.INSTANCE.createMessageFlux(it) } } private fun sendAcceptedResponse(ctx: Context) { @@ -95,7 +95,7 @@ class HttpServer(private val vesClient: VesHvClient) { private fun extractMessageParameters(request: JsonObject): MessageParameters = try { - val commonEventHeader = MessageFactory.INSTANCE + val commonEventHeader = MessageGeneratorImpl.INSTANCE .parseCommonHeader(request.getJsonObject("commonEventHeader")) val messagesAmount = request.getJsonNumber("messagesAmount").longValue() MessageParameters(commonEventHeader, messagesAmount) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt index f731e11c..0d28bad0 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -32,9 +33,9 @@ import javax.json.JsonObject * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class MessageFactory(private val payloadGenerator: PayloadGenerator) { +internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = + override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { if (messageParameters.amount < 0) it.repeat() @@ -65,16 +66,14 @@ class MessageFactory(private val payloadGenerator: PayloadGenerator) { WireFrame(vesMessageBytes(commonHeader)) - private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray { - val msg = VesEvent.newBuilder() - .setCommonEventHeader(commonHeader) - .setHvRanMeasFields(PayloadGenerator().generatePayload().toByteString()) - .build() - - return msg.toByteArray() - } + private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = + VesEvent.newBuilder() + .setCommonEventHeader(commonHeader) + .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString()) + .build() + .toByteArray() companion object { - val INSTANCE = MessageFactory(PayloadGenerator()) + val INSTANCE = MessageGeneratorImpl(PayloadGenerator()) } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt index 17dbbf41..c8b97639 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt @@ -22,9 +22,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas -import java.util.Random +import java.util.* -class PayloadGenerator { +internal class PayloadGenerator { private val randomGenerator = Random() diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index be351b50..43b73e1f 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -31,7 +31,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher -import reactor.core.publisher.EmitterProcessor import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor @@ -43,7 +42,7 @@ import reactor.ipc.netty.tcp.TcpClient * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class VesHvClient(private val configuration: ClientConfiguration) { +internal class VesHvClient(private val configuration: ClientConfiguration) { private val client: TcpClient = TcpClient.builder() .options { opts -> diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index f2229507..dbeba2b2 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf import arrow.core.Failure import arrow.core.Success -import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain import org.onap.dcae.collectors.veshv.utils.logging.Logger diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt index 2f592641..6f8a95a4 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt @@ -40,7 +40,7 @@ const val SAMPLE_LAST_EPOCH: Long = 120034455 object MessageFactoryTest : Spek({ describe("message factory") { - val factory = MessageFactory.INSTANCE + val factory = MessageGeneratorImpl.INSTANCE given("only common header") { it("should return infinite flux") { diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 033095ad..3246cf59 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -59,21 +59,28 @@ internal class VesHvCollector( .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .onErrorResume(this::handleErrors) .then() } private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { - wireChunkDecoder.release() - } - private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = mapper(input).fold( { Mono.empty() }, { Mono.just(it) }) + private fun handleErrors(ex: Throwable): Flux<RoutedMessage> { + logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})") + logger.debug("Detailed stack trace", ex) + return Flux.empty() + } + + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { + wireChunkDecoder.release() + } + companion object { - val logger = Logger(VesHvCollector::class) + private val logger = Logger(VesHvCollector::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 056e0557..cfb61b3e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -31,9 +31,40 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireChunkDecoder(private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder( + private val decoder: WireFrameDecoder, + alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() + +// TODO: use this implementation and cleanup the rest +// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> { +// if (byteBuf.readableBytes() == 0) { +// byteBuf.release() +// Flux.empty() +// } else { +// streamBuffer.addComponent(true, byteBuf) +// Flux.generate { next -> +// try { +// val frame = decodeFirstFrameFromBuffer() +// if (frame == null) +// next.complete() +// else +// next.next(frame) +// } catch (ex: Exception) { +// next.error(ex) +// } +// } +// } +// }.doOnTerminate { streamBuffer.discardReadComponents() } +// +// +// private fun decodeFirstFrameFromBuffer(): WireFrame? = +// try { +// decoder.decodeFirst(streamBuffer) +// } catch (ex: MissingWireFrameBytesException) { +// logger.trace { "${ex.message} - waiting for more data" } +// null +// } fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter .createFlux(decoder, streamBuffer, byteBuf) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index abebff3d..540c647a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -35,24 +35,27 @@ internal class WireFrameSink( private val streamBuffer: ByteBuf, private val sink: FluxSink<WireFrame>, private val requestedFrameCount: Long) { + private var completed = false fun handleSubscriber() { - logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + if (!completed) { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } - try { - if (requestedFrameCount == Long.MAX_VALUE) { - logger.trace { "Push based strategy" } - pushAvailableFrames() - } else { - logger.trace { "Pull based strategy - req $requestedFrameCount" } - pushUpToNumberOfFrames() + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + completed = true + sink.error(ex) } - } catch (ex: Exception) { - sink.error(ex) - } - - logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + } } private fun pushAvailableFrames() { @@ -61,6 +64,7 @@ internal class WireFrameSink( sink.next(nextFrame) nextFrame = decodeFirstFrameFromBuffer() } + completed = true sink.complete() } @@ -76,6 +80,7 @@ internal class WireFrameSink( } } if (remaining > 0 && nextFrame == null) { + completed = true sink.complete() } } diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml index 1db0345c..63a5c093 100644 --- a/hv-collector-ct/pom.xml +++ b/hv-collector-ct/pom.xml @@ -64,8 +64,17 @@ <artifactId>hv-collector-core</artifactId> <version>${project.parent.version}</version> </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-client-simulator</artifactId> + <version>${project.parent.version}</version> + </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt new file mode 100644 index 00000000..c68f0514 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -0,0 +1,193 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import arrow.syntax.function.partially1 +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters +import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator +import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import reactor.core.publisher.Flux +import reactor.math.sum +import java.security.MessageDigest +import java.time.Duration +import java.util.* +import kotlin.system.measureTimeMillis + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object PerformanceSpecification : Spek({ + describe("VES High Volume Collector performance") { + it("should handle multiple clients in reasonable time") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 300_000 + val runs = 4 + val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) + + val params = MessageParameters( + commonEventHeader = vesEvent().commonEventHeader, + amount = numMessages) + val fluxes = (1.rangeTo(runs)).map { + sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + } + val durationMs = measureTimeMillis { + Flux.merge(fluxes).then().block(timeout) + } + + val durationSec = durationMs / 1000.0 + val throughput = sink.count / durationSec + println("Processed $runs connections each containing $numMessages msgs.") + println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + assertThat(sink.count) + .describedAs("should send all events") + .isEqualTo(runs * numMessages) + } + + it("should disconnect on transmission errors") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 100_000 + val timeout = Duration.ofSeconds(30) + + val params = MessageParameters( + commonEventHeader = vesEvent().commonEventHeader, + amount = numMessages) + + val dataStream = generateDataStream(sut.alloc, params) + .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) + sut.collector.handleConnection(sut.alloc, dataStream) + .timeout(timeout) + .block() + + println("Forwarded ${sink.count} msgs") + assertThat(sink.count) + .describedAs("should send up to number of events") + .isLessThan(numMessages) + } + } + + describe("test infrastructure") { + val digest = MessageDigest.getInstance("MD5") + + fun collectDigest(bb: ByteBuf) { + bb.markReaderIndex() + while (bb.isReadable) { + digest.update(bb.readByte()) + } + bb.resetReaderIndex() + } + + fun calculateDigest(arrays: List<ByteArray>): ByteArray { + for (array in arrays) { + digest.update(array) + } + return digest.digest() + } + + it("should yield same bytes as in the input") { + val numberOfBuffers = 10 + val singleBufferSize = 1000 + val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) } + val inputDigest = calculateDigest(arrays) + + val actualTotalSize = Flux.fromIterable(arrays) + .map { Unpooled.wrappedBuffer(it) } + .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) } + .doOnNext(::collectDigest) + .map { + val size = it.readableBytes() + it.release() + size + } + .sum() + .map(Long::toInt) + .block() + + val outputDigest = digest.digest() + + assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(outputDigest).isEqualTo(inputDigest) + + } + } +}) + + +private const val ONE_MILION = 1_000_000.0 + +private val rand = Random() +private fun randomByteArray(size: Int): ByteArray { + val bytes = ByteArray(size) + rand.nextBytes(bytes) + return bytes +} + +fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> = + stream.index() + .filter { predicate(it.t1) } + .map { it.t2 } + +private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = + WireFrameEncoder(alloc).let { encoder -> + createMessageGenerator() + .createMessageFlux(params) + .map(encoder::encode) + .transform { simulateRemoteTcp(alloc, 1000, it) } + } + +private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) = + byteBuffers + .bufferTimeout(maxSize, Duration.ofMillis(250)) + .map { joinBuffers(alloc, it) } + .concatMap { randomlySplitTcpFrames(it) } + +private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) = + alloc.compositeBuffer().addComponents(true, it) + +private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { + val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt()) + return Flux.create<ByteBuf> { sink -> + while (bb.isReadable) { + val frameSize = Math.min(targetFrameSize, bb.readableBytes()) + sink.next(bb.retainedSlice(bb.readerIndex(), frameSize)) + bb.readerIndex(bb.readerIndex() + frameSize) + } + bb.release() + sink.complete() + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 5099ae4c..44b3266e 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -22,14 +22,14 @@ package org.onap.dcae.collectors.veshv.tests.component import io.netty.buffer.ByteBuf import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics -import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.Exceptions import reactor.core.publisher.Flux import java.time.Duration @@ -37,9 +37,9 @@ import java.time.Duration * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class Sut { +class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() - val sink = FakeSink() + val alloc = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) @@ -47,21 +47,9 @@ internal class Sut { val collector: Collector get() = collectorProvider() +} - fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - return sink.sentMessages - } - - fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> = - try { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - Pair(sink.sentMessages, null) - } catch (ex: Exception) { - Pair(sink.sentMessages, ex) - } - - companion object { - val logger = Logger(Sut::class) - } +fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + return sink.sentMessages } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 08b6382d..08450598 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,8 +23,10 @@ import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -34,9 +36,11 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain */ object VesHvSpecification : Spek({ describe("VES High Volume Collector") { - system("should handle multiple HV RAN events") { sut -> + it("should handle multiple HV RAN events") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) assertThat(messages) .describedAs("should send all events") @@ -46,18 +50,18 @@ object VesHvSpecification : Spek({ describe("Memory management") { - system("should release memory for each handled and dropped message") { sut -> + it("should release memory for each handled and dropped message") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithInvalidDomain, msgWithInvalidFrame) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame) assertThat(handledEvents).hasSize(1) - assertThat(exception).isNull() assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -71,17 +75,17 @@ object VesHvSpecification : Spek({ } - system("should release memory for each message with invalid payload") { sut -> + it("should release memory for each message with invalid payload") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidPayload = invalidVesMessage() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithInvalidPayload) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) assertThat(handledEvents).hasSize(1) - assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -92,18 +96,17 @@ object VesHvSpecification : Spek({ } - system("should release memory for each message with garbage frame") { sut -> + it("should release memory for each message with garbage frame") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 - val (handledEvents, exception) = sut.handleConnectionReturningError( - validMessage, msgWithGarbageFrame) + val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame) assertThat(handledEvents).hasSize(1) - assertThat(exception?.cause) - .isInstanceOf(InvalidWireFrameMarkerException::class.java) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") @@ -116,10 +119,12 @@ object VesHvSpecification : Spek({ } describe("message routing") { - system("should direct message to a topic by means of routing configuration") { sut -> + it("should direct message to a topic by means of routing configuration") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] @@ -127,9 +132,11 @@ object VesHvSpecification : Spek({ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } - system("should drop message if route was not found") { sut -> + it("should drop message if route was not found") { + val sink = StoringSink() + val sut = Sut(sink) sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection( + val messages = sut.handleConnection(sink, vesMessage(Domain.OTHER, "first"), vesMessage(Domain.HVRANMEAS, "second"), vesMessage(Domain.HEARTBEAT, "third")) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt index 3314c44f..8895d642 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -25,7 +25,7 @@ import io.netty.buffer.PooledByteBufAllocator import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import java.util.UUID +import java.util.* val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT @@ -61,7 +61,7 @@ fun invalidWireFrame() = allocator.buffer().run { writeByte(0x01) // content type = GPB } -fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = +fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) = VesEvent.newBuilder() .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index b0dbd0f5..a5fd546a 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -21,16 +21,16 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class FakeSink : Sink { +class StoringSink : Sink { private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() val sentMessages: List<RoutedMessage> @@ -40,3 +40,20 @@ class FakeSink : Sink { return messages.doOnNext(sent::addLast) } } + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class CountingSink : Sink { + private val atomicCount = AtomicLong(0) + + val count: Long + get() = atomicCount.get() + + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + return messages.doOnNext { + atomicCount.incrementAndGet() + } + } +} diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml index 84abc9d3..93f22771 100644 --- a/hv-collector-ct/src/test/resources/logback-test.xml +++ b/hv-collector-ct/src/test/resources/logback-test.xml @@ -26,10 +26,10 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="ROLLING-FILE"/> </root> -</configuration>
\ No newline at end of file +</configuration> diff --git a/req.json b/req.json new file mode 100644 index 00000000..e092ed66 --- /dev/null +++ b/req.json @@ -0,0 +1,20 @@ +{ + "commonEventHeader": { + "version": "sample-version", + "domain": 10, + "sequence": 1, + "priority": 1, + "eventId": "sample-event-id", + "eventName": "sample-event-name", + "eventType": "sample-event-type", + "startEpochMicrosec": 120034455, + "lastEpochMicrosec": 120034455, + "nfNamingCode": "sample-nf-naming-code", + "nfcNamingCode": "sample-nfc-naming-code", + "reportingEntityId": "sample-reporting-entity-id", + "reportingEntityName": "sample-reporting-entity-name", + "sourceId": "sample-source-id", + "sourceName": "sample-source-name" + }, + "messagesAmount": 1000000 +} |