diff options
Diffstat (limited to 'sources/hv-collector-ct')
10 files changed, 1045 insertions, 0 deletions
diff --git a/sources/hv-collector-ct/pom.xml b/sources/hv-collector-ct/pom.xml new file mode 100644 index 00000000..61ac426b --- /dev/null +++ b/sources/hv-collector-ct/pom.xml @@ -0,0 +1,115 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-ct</artifactId> + <description>VES HighVolume Collector :: Component tests</description> + + <properties> + <failIfMissingUnitTests>false</failIfMissingUnitTests> + <failIfMissingComponentTests>true</failIfMissingComponentTests> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-xnf-simulator</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + + + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt new file mode 100644 index 00000000..0897e910 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -0,0 +1,203 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import arrow.syntax.function.partially1 +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import reactor.core.publisher.Flux +import reactor.math.sum +import java.security.MessageDigest +import java.time.Duration +import java.util.* +import kotlin.system.measureTimeMillis + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object PerformanceSpecification : Spek({ + debugRx(false) + + describe("VES High Volume Collector performance") { + it("should handle multiple clients in reasonable time") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 300_000 + val runs = 4 + val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) + + val params = MessageParameters( + commonEventHeader = commonHeader(PERF3GPP), + messageType = VALID, + amount = numMessages + ) + + val fluxes = (1.rangeTo(runs)).map { + sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) + } + val durationMs = measureTimeMillis { + Flux.merge(fluxes).then().block(timeout) + } + + val durationSec = durationMs / 1000.0 + val throughput = sink.count / durationSec + logger.info("Processed $runs connections each containing $numMessages msgs.") + logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + assertThat(sink.count) + .describedAs("should send all events") + .isEqualTo(runs * numMessages) + } + + it("should disconnect on transmission errors") { + val sink = CountingSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val numMessages: Long = 100_000 + val timeout = Duration.ofSeconds(30) + + val params = MessageParameters( + commonEventHeader = commonHeader(PERF3GPP), + messageType = VALID, + amount = numMessages + ) + + val dataStream = generateDataStream(sut.alloc, params) + .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) + sut.collector.handleConnection(sut.alloc, dataStream) + .timeout(timeout) + .block() + + logger.info("Forwarded ${sink.count} msgs") + assertThat(sink.count) + .describedAs("should send up to number of events") + .isLessThan(numMessages) + } + } + + describe("test infrastructure") { + val digest = MessageDigest.getInstance("MD5") + + fun collectDigest(bb: ByteBuf) { + bb.markReaderIndex() + while (bb.isReadable) { + digest.update(bb.readByte()) + } + bb.resetReaderIndex() + } + + fun calculateDigest(arrays: List<ByteArray>): ByteArray { + for (array in arrays) { + digest.update(array) + } + return digest.digest() + } + + it("should yield same bytes as in the input") { + val numberOfBuffers = 10 + val singleBufferSize = 1000 + val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) } + val inputDigest = calculateDigest(arrays) + + val actualTotalSize = Flux.fromIterable(arrays) + .map { Unpooled.wrappedBuffer(it) } + .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) } + .doOnNext(::collectDigest) + .map { + val size = it.readableBytes() + it.release() + size + } + .sum() + .map(Long::toInt) + .block() + + val outputDigest = digest.digest() + + assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize) + assertThat(outputDigest).isEqualTo(inputDigest) + + } + } +}) + + +private const val ONE_MILION = 1_000_000.0 + +private val rand = Random() +private fun randomByteArray(size: Int): ByteArray { + val bytes = ByteArray(size) + rand.nextBytes(bytes) + return bytes +} + +fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> = + stream.index() + .filter { predicate(it.t1) } + .map { it.t2 } + +private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = + WireFrameEncoder(alloc).let { encoder -> + MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES) + .createMessageFlux(listOf(params)) + .map(encoder::encode) + .transform { simulateRemoteTcp(alloc, 1000, it) } + } + +private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) = + byteBuffers + .bufferTimeout(maxSize, Duration.ofMillis(250)) + .map { joinBuffers(alloc, it) } + .concatMap { randomlySplitTcpFrames(it) } + +private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) = + alloc.compositeBuffer().addComponents(true, it) + +private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { + val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt()) + return Flux.create<ByteBuf> { sink -> + while (bb.isReadable) { + val frameSize = Math.min(targetFrameSize, bb.readableBytes()) + sink.next(bb.retainedSlice(bb.readerIndex(), frameSize)) + bb.readerIndex(bb.readerIndex() + frameSize) + } + bb.release() + sink.complete() + } +} + diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt new file mode 100644 index 00000000..0495ced5 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import arrow.core.getOrElse +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.UnpooledByteBufAllocator +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class Sut(sink: Sink = StoringSink()) { + val configurationProvider = FakeConfigurationProvider() + val healthStateProvider = FakeHealthState() + + val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT + private val metrics = FakeMetrics() + private val collectorFactory = CollectorFactory( + configurationProvider, + SinkProvider.just(sink), + metrics, + MAX_PAYLOAD_SIZE_BYTES, + healthStateProvider) + private val collectorProvider = collectorFactory.createVesHvCollectorProvider() + + val collector: Collector + get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + + companion object { + const val MAX_PAYLOAD_SIZE_BYTES = 1024 + } + +} + +fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + return sink.sentMessages +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt new file mode 100644 index 00000000..2d81c671 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -0,0 +1,347 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame +import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload + +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object VesHvSpecification : Spek({ + debugRx(false) + + describe("VES High Volume Collector") { + it("should handle multiple HV RAN events") { + val (sut, sink) = vesHvWithStoringSink() + val messages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP), + vesWireFrameMessage(PERF3GPP) + ) + + assertThat(messages) + .describedAs("should send all events") + .hasSize(2) + } + } + + describe("Memory management") { + it("should release memory for each handled and dropped message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithInvalidFrame = invalidWireFrame() + val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection( + sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidFrame.refCnt()) + .describedAs("message with invalid frame should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithTooBigPayload.refCnt()) + .describedAs("message with payload exceeding 1MiB should be released") + .isEqualTo(expectedRefCnt) + } + + it("should release memory for each message with invalid payload") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(expectedRefCnt) + + } + + it("should release memory for each message with garbage frame") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithGarbageFrame = garbageFrame() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithGarbageFrame.refCnt()) + .describedAs("message with garbage frame should be released") + .isEqualTo(expectedRefCnt) + + } + } + + describe("message routing") { + it("should direct message to a topic by means of routing configuration") { + val (sut, sink) = vesHvWithStoringSink() + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + } + + it("should be able to direct 2 messages from different domains to one topic") { + val (sut, sink) = vesHvWithStoringSink() + + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + + val messages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP), + vesWireFrameMessage(HEARTBEAT), + vesWireFrameMessage(MEASUREMENT)) + + assertThat(messages).describedAs("number of routed messages").hasSize(3) + + assertThat(messages[0].topic).describedAs("first message topic") + .isEqualTo(PERF3GPP_TOPIC) + + assertThat(messages[1].topic).describedAs("second message topic") + .isEqualTo(PERF3GPP_TOPIC) + + assertThat(messages[2].topic).describedAs("last message topic") + .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + } + + it("should drop message if route was not found") { + val (sut, sink) = vesHvWithStoringSink() + val messages = sut.handleConnection(sink, + vesWireFrameMessage(OTHER, "first"), + vesWireFrameMessage(PERF3GPP, "second"), + vesWireFrameMessage(HEARTBEAT, "third")) + + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") + } + } + + describe("configuration update") { + + val defaultTimeout = Duration.ofSeconds(10) + + given("successful configuration change") { + + lateinit var sut: Sut + lateinit var sink: StoringSink + + beforeEachTest { + vesHvWithStoringSink().run { + sut = first + sink = second + } + } + + it("should update collector") { + val firstCollector = sut.collector + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector + + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + } + + it("should start routing messages") { + + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).isEmpty() + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] + + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(PERF3GPP_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should change domain routing") { + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] + + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(PERF3GPP_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] + + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should update routing for each client sending one message") { + + val messagesAmount = 10 + val messagesForEachTopic = 5 + + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + }.then().block(defaultTimeout) + + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } + + it("should not update routing for client sending continuous stream of messages") { + + val messageStreamSize = 10 + val pivot = 5 + + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesWireFrameMessage(PERF3GPP) } + + + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) + + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } + + it("should mark the application healthy") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthDescription.HEALTHY) + } + } + + given("failed configuration change") { + val (sut, _) = vesHvWithStoringSink() + sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + it("should mark the application unhealthy ") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + } + } + } + + describe("request validation") { + it("should reject message with payload greater than 1 MiB and all subsequent messages") { + val (sut, sink) = vesHvWithStoringSink() + + val handledMessages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP, "first"), + vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), + vesWireFrameMessage(PERF3GPP)) + + assertThat(handledMessages).hasSize(1) + assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") + } + } + +}) + +private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { + val sink = StoringSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + return Pair(sut, sink) +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt new file mode 100644 index 00000000..29df8c70 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.jetbrains.spek.api.dsl.SpecBody +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Hooks + +fun SpecBody.debugRx(debug: Boolean = true) { + if (debug) { + beforeGroup { + Hooks.onOperatorDebug() + } + + afterGroup { + Hooks.resetOnOperatorDebug() + } + } +} + +val logger = Logger("org.onap.dcae.collectors.veshv.tests.component") diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt new file mode 100644 index 00000000..c25771b7 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import reactor.core.publisher.Flux + +class FakeHealthState : HealthState { + + lateinit var currentHealth: HealthDescription + + override fun changeState(healthDescription: HealthDescription) { + currentHealth = healthDescription + } + + override fun invoke(): Flux<HealthDescription> { + throw NotImplementedError() + } +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt new file mode 100644 index 00000000..3770913a --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.routing + +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor +import reactor.retry.RetryExhaustedException + + +const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" +const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" +const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" + +val basicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + }.build() +) + +val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(HEARTBEAT.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(MEASUREMENT.domainName) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(ALTERNATE_PERF3GPP_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + }.build() +) + +class FakeConfigurationProvider : ConfigurationProvider { + private var shouldThrowException = false + private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() + + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + if (shouldThrowException) { + configStream.onError(RetryExhaustedException("I'm so tired")) + } else { + configStream.onNext(collectorConfiguration) + } + + + fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { + this.shouldThrowException = shouldThrowException + } + + override fun invoke() = configStream +} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt new file mode 100644 index 00000000..aa3fdc39 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class FakeMetrics: Metrics { + override fun notifyBytesReceived(size: Int) { + } + + override fun notifyMessageReceived(size: Int) { + } + + override fun notifyMessageSent(topic: String) { + } +}
\ No newline at end of file diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt new file mode 100644 index 00000000..a5fd546a --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import reactor.core.publisher.Flux +import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class StoringSink : Sink { + private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + + val sentMessages: List<RoutedMessage> + get() = sent.toList() + + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + return messages.doOnNext(sent::addLast) + } +} + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class CountingSink : Sink { + private val atomicCount = AtomicLong(0) + + val count: Long + get() = atomicCount.get() + + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + return messages.doOnNext { + atomicCount.incrementAndGet() + } + } +} diff --git a/sources/hv-collector-ct/src/test/resources/logback-test.xml b/sources/hv-collector-ct/src/test/resources/logback-test.xml new file mode 100644 index 00000000..93f22771 --- /dev/null +++ b/sources/hv-collector-ct/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration> |