aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-28 15:46:50 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-29 14:41:42 +0100
commitdde383a2aa75f94c26d7949665b79cc95486a223 (patch)
tree75f3e8f564067afd0e67dbe6254183e45ca26944 /sources/hv-collector-ct/src
parent77f896523f2065b1da1be21545155a29edea5122 (diff)
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt203
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt68
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt347
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt38
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt37
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt106
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt37
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt59
-rw-r--r--sources/hv-collector-ct/src/test/resources/logback-test.xml35
9 files changed, 930 insertions, 0 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
new file mode 100644
index 00000000..0897e910
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -0,0 +1,203 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import arrow.syntax.function.partially1
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import reactor.core.publisher.Flux
+import reactor.math.sum
+import java.security.MessageDigest
+import java.time.Duration
+import java.util.*
+import kotlin.system.measureTimeMillis
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object PerformanceSpecification : Spek({
+ debugRx(false)
+
+ describe("VES High Volume Collector performance") {
+ it("should handle multiple clients in reasonable time") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 300_000
+ val runs = 4
+ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
+
+ val params = MessageParameters(
+ commonEventHeader = commonHeader(PERF3GPP),
+ messageType = VALID,
+ amount = numMessages
+ )
+
+ val fluxes = (1.rangeTo(runs)).map {
+ sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ }
+ val durationMs = measureTimeMillis {
+ Flux.merge(fluxes).then().block(timeout)
+ }
+
+ val durationSec = durationMs / 1000.0
+ val throughput = sink.count / durationSec
+ logger.info("Processed $runs connections each containing $numMessages msgs.")
+ logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ assertThat(sink.count)
+ .describedAs("should send all events")
+ .isEqualTo(runs * numMessages)
+ }
+
+ it("should disconnect on transmission errors") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 100_000
+ val timeout = Duration.ofSeconds(30)
+
+ val params = MessageParameters(
+ commonEventHeader = commonHeader(PERF3GPP),
+ messageType = VALID,
+ amount = numMessages
+ )
+
+ val dataStream = generateDataStream(sut.alloc, params)
+ .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
+ sut.collector.handleConnection(sut.alloc, dataStream)
+ .timeout(timeout)
+ .block()
+
+ logger.info("Forwarded ${sink.count} msgs")
+ assertThat(sink.count)
+ .describedAs("should send up to number of events")
+ .isLessThan(numMessages)
+ }
+ }
+
+ describe("test infrastructure") {
+ val digest = MessageDigest.getInstance("MD5")
+
+ fun collectDigest(bb: ByteBuf) {
+ bb.markReaderIndex()
+ while (bb.isReadable) {
+ digest.update(bb.readByte())
+ }
+ bb.resetReaderIndex()
+ }
+
+ fun calculateDigest(arrays: List<ByteArray>): ByteArray {
+ for (array in arrays) {
+ digest.update(array)
+ }
+ return digest.digest()
+ }
+
+ it("should yield same bytes as in the input") {
+ val numberOfBuffers = 10
+ val singleBufferSize = 1000
+ val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
+ val inputDigest = calculateDigest(arrays)
+
+ val actualTotalSize = Flux.fromIterable(arrays)
+ .map { Unpooled.wrappedBuffer(it) }
+ .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
+ .doOnNext(::collectDigest)
+ .map {
+ val size = it.readableBytes()
+ it.release()
+ size
+ }
+ .sum()
+ .map(Long::toInt)
+ .block()
+
+ val outputDigest = digest.digest()
+
+ assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
+ assertThat(outputDigest).isEqualTo(inputDigest)
+
+ }
+ }
+})
+
+
+private const val ONE_MILION = 1_000_000.0
+
+private val rand = Random()
+private fun randomByteArray(size: Int): ByteArray {
+ val bytes = ByteArray(size)
+ rand.nextBytes(bytes)
+ return bytes
+}
+
+fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
+ stream.index()
+ .filter { predicate(it.t1) }
+ .map { it.t2 }
+
+private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
+ WireFrameEncoder(alloc).let { encoder ->
+ MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
+ .createMessageFlux(listOf(params))
+ .map(encoder::encode)
+ .transform { simulateRemoteTcp(alloc, 1000, it) }
+ }
+
+private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
+ byteBuffers
+ .bufferTimeout(maxSize, Duration.ofMillis(250))
+ .map { joinBuffers(alloc, it) }
+ .concatMap { randomlySplitTcpFrames(it) }
+
+private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
+ alloc.compositeBuffer().addComponents(true, it)
+
+private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
+ val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
+ return Flux.create<ByteBuf> { sink ->
+ while (bb.isReadable) {
+ val frameSize = Math.min(targetFrameSize, bb.readableBytes())
+ sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
+ bb.readerIndex(bb.readerIndex() + frameSize)
+ }
+ bb.release()
+ sink.complete()
+ }
+}
+
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
new file mode 100644
index 00000000..0495ced5
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import arrow.core.getOrElse
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import reactor.core.publisher.Flux
+import java.time.Duration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class Sut(sink: Sink = StoringSink()) {
+ val configurationProvider = FakeConfigurationProvider()
+ val healthStateProvider = FakeHealthState()
+
+ val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+ private val metrics = FakeMetrics()
+ private val collectorFactory = CollectorFactory(
+ configurationProvider,
+ SinkProvider.just(sink),
+ metrics,
+ MAX_PAYLOAD_SIZE_BYTES,
+ healthStateProvider)
+ private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
+
+ val collector: Collector
+ get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+
+ companion object {
+ const val MAX_PAYLOAD_SIZE_BYTES = 1024
+ }
+
+}
+
+fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ return sink.sentMessages
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
new file mode 100644
index 00000000..2d81c671
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -0,0 +1,347 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+
+import reactor.core.publisher.Flux
+import java.time.Duration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object VesHvSpecification : Spek({
+ debugRx(false)
+
+ describe("VES High Volume Collector") {
+ it("should handle multiple HV RAN events") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP)
+ )
+
+ assertThat(messages)
+ .describedAs("should send all events")
+ .hasSize(2)
+ }
+ }
+
+ describe("Memory management") {
+ it("should release memory for each handled and dropped message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
+ val msgWithInvalidFrame = invalidWireFrame()
+ val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(
+ sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
+
+ assertThat(handledEvents).hasSize(1)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithInvalidFrame.refCnt())
+ .describedAs("message with invalid frame should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithTooBigPayload.refCnt())
+ .describedAs("message with payload exceeding 1MiB should be released")
+ .isEqualTo(expectedRefCnt)
+ }
+
+ it("should release memory for each message with invalid payload") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
+ val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
+
+ assertThat(handledEvents).hasSize(1)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithInvalidPayload.refCnt())
+ .describedAs("message with invalid payload should be released")
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ it("should release memory for each message with garbage frame") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
+ val msgWithGarbageFrame = garbageFrame()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
+
+ assertThat(handledEvents).hasSize(1)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithGarbageFrame.refCnt())
+ .describedAs("message with garbage frame should be released")
+ .isEqualTo(expectedRefCnt)
+
+ }
+ }
+
+ describe("message routing") {
+ it("should direct message to a topic by means of routing configuration") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messages).describedAs("number of routed messages").hasSize(1)
+
+ val msg = messages[0]
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+ }
+
+ it("should be able to direct 2 messages from different domains to one topic") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(HEARTBEAT),
+ vesWireFrameMessage(MEASUREMENT))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+ assertThat(messages[0].topic).describedAs("first message topic")
+ .isEqualTo(PERF3GPP_TOPIC)
+
+ assertThat(messages[1].topic).describedAs("second message topic")
+ .isEqualTo(PERF3GPP_TOPIC)
+
+ assertThat(messages[2].topic).describedAs("last message topic")
+ .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ }
+
+ it("should drop message if route was not found") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(OTHER, "first"),
+ vesWireFrameMessage(PERF3GPP, "second"),
+ vesWireFrameMessage(HEARTBEAT, "third"))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(1)
+
+ val msg = messages[0]
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
+ }
+ }
+
+ describe("configuration update") {
+
+ val defaultTimeout = Duration.ofSeconds(10)
+
+ given("successful configuration change") {
+
+ lateinit var sut: Sut
+ lateinit var sink: StoringSink
+
+ beforeEachTest {
+ vesHvWithStoringSink().run {
+ sut = first
+ sink = second
+ }
+ }
+
+ it("should update collector") {
+ val firstCollector = sut.collector
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
+
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ }
+
+ it("should start routing messages") {
+
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messages).isEmpty()
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
+
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(PERF3GPP_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should change domain routing") {
+
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
+
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(PERF3GPP_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
+
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should update routing for each client sending one message") {
+
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
+
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ }.then().block(defaultTimeout)
+
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
+
+ it("should not update routing for client sending continuous stream of messages") {
+
+ val messageStreamSize = 10
+ val pivot = 5
+
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesWireFrameMessage(PERF3GPP) }
+
+
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
+
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
+
+ it("should mark the application healthy") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthDescription.HEALTHY)
+ }
+ }
+
+ given("failed configuration change") {
+ val (sut, _) = vesHvWithStoringSink()
+ sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ it("should mark the application unhealthy ") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ }
+ }
+ }
+
+ describe("request validation") {
+ it("should reject message with payload greater than 1 MiB and all subsequent messages") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ val handledMessages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP, "first"),
+ vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
+ vesWireFrameMessage(PERF3GPP))
+
+ assertThat(handledMessages).hasSize(1)
+ assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
+ }
+ }
+
+})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
new file mode 100644
index 00000000..29df8c70
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import org.jetbrains.spek.api.dsl.SpecBody
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Hooks
+
+fun SpecBody.debugRx(debug: Boolean = true) {
+ if (debug) {
+ beforeGroup {
+ Hooks.onOperatorDebug()
+ }
+
+ afterGroup {
+ Hooks.resetOnOperatorDebug()
+ }
+ }
+}
+
+val logger = Logger("org.onap.dcae.collectors.veshv.tests.component")
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
new file mode 100644
index 00000000..c25771b7
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import reactor.core.publisher.Flux
+
+class FakeHealthState : HealthState {
+
+ lateinit var currentHealth: HealthDescription
+
+ override fun changeState(healthDescription: HealthDescription) {
+ currentHealth = healthDescription
+ }
+
+ override fun invoke(): Flux<HealthDescription> {
+ throw NotImplementedError()
+ }
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
new file mode 100644
index 00000000..3770913a
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -0,0 +1,106 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.routing
+
+import reactor.core.publisher.FluxProcessor
+import reactor.core.publisher.UnicastProcessor
+import reactor.retry.RetryExhaustedException
+
+
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
+
+val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
+val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(HEARTBEAT.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(MEASUREMENT.domainName)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
+
+val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(ALTERNATE_PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
+
+val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ }.build()
+)
+
+class FakeConfigurationProvider : ConfigurationProvider {
+ private var shouldThrowException = false
+ private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
+
+ fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+ if (shouldThrowException) {
+ configStream.onError(RetryExhaustedException("I'm so tired"))
+ } else {
+ configStream.onNext(collectorConfiguration)
+ }
+
+
+ fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
+ this.shouldThrowException = shouldThrowException
+ }
+
+ override fun invoke() = configStream
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
new file mode 100644
index 00000000..aa3fdc39
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class FakeMetrics: Metrics {
+ override fun notifyBytesReceived(size: Int) {
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ }
+} \ No newline at end of file
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
new file mode 100644
index 00000000..a5fd546a
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import reactor.core.publisher.Flux
+import java.util.*
+import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class StoringSink : Sink {
+ private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+
+ val sentMessages: List<RoutedMessage>
+ get() = sent.toList()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ return messages.doOnNext(sent::addLast)
+ }
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CountingSink : Sink {
+ private val atomicCount = AtomicLong(0)
+
+ val count: Long
+ get() = atomicCount.get()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ return messages.doOnNext {
+ atomicCount.incrementAndGet()
+ }
+ }
+}
diff --git a/sources/hv-collector-ct/src/test/resources/logback-test.xml b/sources/hv-collector-ct/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..93f22771
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/resources/logback-test.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration>