aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct/src
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct/src')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt203
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt68
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt347
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt38
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt37
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt106
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt37
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt59
-rw-r--r--hv-collector-ct/src/test/resources/logback-test.xml35
9 files changed, 0 insertions, 930 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
deleted file mode 100644
index 0897e910..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.component
-
-import arrow.syntax.function.partially1
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import io.netty.buffer.CompositeByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import reactor.core.publisher.Flux
-import reactor.math.sum
-import java.security.MessageDigest
-import java.time.Duration
-import java.util.*
-import kotlin.system.measureTimeMillis
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object PerformanceSpecification : Spek({
- debugRx(false)
-
- describe("VES High Volume Collector performance") {
- it("should handle multiple clients in reasonable time") {
- val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
-
- val numMessages: Long = 300_000
- val runs = 4
- val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
-
- val params = MessageParameters(
- commonEventHeader = commonHeader(PERF3GPP),
- messageType = VALID,
- amount = numMessages
- )
-
- val fluxes = (1.rangeTo(runs)).map {
- sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
- }
- val durationMs = measureTimeMillis {
- Flux.merge(fluxes).then().block(timeout)
- }
-
- val durationSec = durationMs / 1000.0
- val throughput = sink.count / durationSec
- logger.info("Processed $runs connections each containing $numMessages msgs.")
- logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
- assertThat(sink.count)
- .describedAs("should send all events")
- .isEqualTo(runs * numMessages)
- }
-
- it("should disconnect on transmission errors") {
- val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
-
- val numMessages: Long = 100_000
- val timeout = Duration.ofSeconds(30)
-
- val params = MessageParameters(
- commonEventHeader = commonHeader(PERF3GPP),
- messageType = VALID,
- amount = numMessages
- )
-
- val dataStream = generateDataStream(sut.alloc, params)
- .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
- sut.collector.handleConnection(sut.alloc, dataStream)
- .timeout(timeout)
- .block()
-
- logger.info("Forwarded ${sink.count} msgs")
- assertThat(sink.count)
- .describedAs("should send up to number of events")
- .isLessThan(numMessages)
- }
- }
-
- describe("test infrastructure") {
- val digest = MessageDigest.getInstance("MD5")
-
- fun collectDigest(bb: ByteBuf) {
- bb.markReaderIndex()
- while (bb.isReadable) {
- digest.update(bb.readByte())
- }
- bb.resetReaderIndex()
- }
-
- fun calculateDigest(arrays: List<ByteArray>): ByteArray {
- for (array in arrays) {
- digest.update(array)
- }
- return digest.digest()
- }
-
- it("should yield same bytes as in the input") {
- val numberOfBuffers = 10
- val singleBufferSize = 1000
- val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
- val inputDigest = calculateDigest(arrays)
-
- val actualTotalSize = Flux.fromIterable(arrays)
- .map { Unpooled.wrappedBuffer(it) }
- .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
- .doOnNext(::collectDigest)
- .map {
- val size = it.readableBytes()
- it.release()
- size
- }
- .sum()
- .map(Long::toInt)
- .block()
-
- val outputDigest = digest.digest()
-
- assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize)
- assertThat(outputDigest).isEqualTo(inputDigest)
-
- }
- }
-})
-
-
-private const val ONE_MILION = 1_000_000.0
-
-private val rand = Random()
-private fun randomByteArray(size: Int): ByteArray {
- val bytes = ByteArray(size)
- rand.nextBytes(bytes)
- return bytes
-}
-
-fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
- stream.index()
- .filter { predicate(it.t1) }
- .map { it.t2 }
-
-private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
- WireFrameEncoder(alloc).let { encoder ->
- MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
- .createMessageFlux(listOf(params))
- .map(encoder::encode)
- .transform { simulateRemoteTcp(alloc, 1000, it) }
- }
-
-private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
- byteBuffers
- .bufferTimeout(maxSize, Duration.ofMillis(250))
- .map { joinBuffers(alloc, it) }
- .concatMap { randomlySplitTcpFrames(it) }
-
-private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
- alloc.compositeBuffer().addComponents(true, it)
-
-private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
- val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
- return Flux.create<ByteBuf> { sink ->
- while (bb.isReadable) {
- val frameSize = Math.min(targetFrameSize, bb.readableBytes())
- sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
- bb.readerIndex(bb.readerIndex() + frameSize)
- }
- bb.release()
- sink.complete()
- }
-}
-
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
deleted file mode 100644
index 0495ced5..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.component
-
-import arrow.core.getOrElse
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import reactor.core.publisher.Flux
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class Sut(sink: Sink = StoringSink()) {
- val configurationProvider = FakeConfigurationProvider()
- val healthStateProvider = FakeHealthState()
-
- val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
- private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(
- configurationProvider,
- SinkProvider.just(sink),
- metrics,
- MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider)
- private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
-
- val collector: Collector
- get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
-
- companion object {
- const val MAX_PAYLOAD_SIZE_BYTES = 1024
- }
-
-}
-
-fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
- return sink.sentMessages
-}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
deleted file mode 100644
index 2d81c671..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.component
-
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-
-import reactor.core.publisher.Flux
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object VesHvSpecification : Spek({
- debugRx(false)
-
- describe("VES High Volume Collector") {
- it("should handle multiple HV RAN events") {
- val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink,
- vesWireFrameMessage(PERF3GPP),
- vesWireFrameMessage(PERF3GPP)
- )
-
- assertThat(messages)
- .describedAs("should send all events")
- .hasSize(2)
- }
- }
-
- describe("Memory management") {
- it("should release memory for each handled and dropped message") {
- val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(PERF3GPP)
- val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
- val expectedRefCnt = 0
-
- val handledEvents = sut.handleConnection(
- sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
-
- assertThat(handledEvents).hasSize(1)
-
- assertThat(validMessage.refCnt())
- .describedAs("handled message should be released")
- .isEqualTo(expectedRefCnt)
- assertThat(msgWithInvalidFrame.refCnt())
- .describedAs("message with invalid frame should be released")
- .isEqualTo(expectedRefCnt)
- assertThat(msgWithTooBigPayload.refCnt())
- .describedAs("message with payload exceeding 1MiB should be released")
- .isEqualTo(expectedRefCnt)
- }
-
- it("should release memory for each message with invalid payload") {
- val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(PERF3GPP)
- val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
- val expectedRefCnt = 0
-
- val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
-
- assertThat(handledEvents).hasSize(1)
-
- assertThat(validMessage.refCnt())
- .describedAs("handled message should be released")
- .isEqualTo(expectedRefCnt)
- assertThat(msgWithInvalidPayload.refCnt())
- .describedAs("message with invalid payload should be released")
- .isEqualTo(expectedRefCnt)
-
- }
-
- it("should release memory for each message with garbage frame") {
- val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(PERF3GPP)
- val msgWithGarbageFrame = garbageFrame()
- val expectedRefCnt = 0
-
- val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
-
- assertThat(handledEvents).hasSize(1)
-
- assertThat(validMessage.refCnt())
- .describedAs("handled message should be released")
- .isEqualTo(expectedRefCnt)
- assertThat(msgWithGarbageFrame.refCnt())
- .describedAs("message with garbage frame should be released")
- .isEqualTo(expectedRefCnt)
-
- }
- }
-
- describe("message routing") {
- it("should direct message to a topic by means of routing configuration") {
- val (sut, sink) = vesHvWithStoringSink()
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).describedAs("number of routed messages").hasSize(1)
-
- val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
- }
-
- it("should be able to direct 2 messages from different domains to one topic") {
- val (sut, sink) = vesHvWithStoringSink()
-
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
-
- val messages = sut.handleConnection(sink,
- vesWireFrameMessage(PERF3GPP),
- vesWireFrameMessage(HEARTBEAT),
- vesWireFrameMessage(MEASUREMENT))
-
- assertThat(messages).describedAs("number of routed messages").hasSize(3)
-
- assertThat(messages[0].topic).describedAs("first message topic")
- .isEqualTo(PERF3GPP_TOPIC)
-
- assertThat(messages[1].topic).describedAs("second message topic")
- .isEqualTo(PERF3GPP_TOPIC)
-
- assertThat(messages[2].topic).describedAs("last message topic")
- .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- }
-
- it("should drop message if route was not found") {
- val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink,
- vesWireFrameMessage(OTHER, "first"),
- vesWireFrameMessage(PERF3GPP, "second"),
- vesWireFrameMessage(HEARTBEAT, "third"))
-
- assertThat(messages).describedAs("number of routed messages").hasSize(1)
-
- val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
- }
- }
-
- describe("configuration update") {
-
- val defaultTimeout = Duration.ofSeconds(10)
-
- given("successful configuration change") {
-
- lateinit var sut: Sut
- lateinit var sink: StoringSink
-
- beforeEachTest {
- vesHvWithStoringSink().run {
- sut = first
- sink = second
- }
- }
-
- it("should update collector") {
- val firstCollector = sut.collector
-
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val collectorAfterUpdate = sut.collector
-
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
- }
-
- it("should start routing messages") {
-
- sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).isEmpty()
-
- sut.configurationProvider.updateConfiguration(basicConfiguration)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
-
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
-
- it("should change domain routing") {
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
-
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
-
-
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
-
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
- }
-
- it("should update routing for each client sending one message") {
-
- val messagesAmount = 10
- val messagesForEachTopic = 5
-
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- }.then().block(defaultTimeout)
-
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
-
- it("should not update routing for client sending continuous stream of messages") {
-
- val messageStreamSize = 10
- val pivot = 5
-
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(PERF3GPP) }
-
-
- sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
-
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
- }
-
- it("should mark the application healthy") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.HEALTHY)
- }
- }
-
- given("failed configuration change") {
- val (sut, _) = vesHvWithStoringSink()
- sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
-
- it("should mark the application unhealthy ") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
- }
- }
- }
-
- describe("request validation") {
- it("should reject message with payload greater than 1 MiB and all subsequent messages") {
- val (sut, sink) = vesHvWithStoringSink()
-
- val handledMessages = sut.handleConnection(sink,
- vesWireFrameMessage(PERF3GPP, "first"),
- vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
- vesWireFrameMessage(PERF3GPP))
-
- assertThat(handledMessages).hasSize(1)
- assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
- }
- }
-
-})
-
-private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
- val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- return Pair(sut, sink)
-}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
deleted file mode 100644
index 29df8c70..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.component
-
-import org.jetbrains.spek.api.dsl.SpecBody
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Hooks
-
-fun SpecBody.debugRx(debug: Boolean = true) {
- if (debug) {
- beforeGroup {
- Hooks.onOperatorDebug()
- }
-
- afterGroup {
- Hooks.resetOnOperatorDebug()
- }
- }
-}
-
-val logger = Logger("org.onap.dcae.collectors.veshv.tests.component")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
deleted file mode 100644
index c25771b7..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import reactor.core.publisher.Flux
-
-class FakeHealthState : HealthState {
-
- lateinit var currentHealth: HealthDescription
-
- override fun changeState(healthDescription: HealthDescription) {
- currentHealth = healthDescription
- }
-
- override fun invoke(): Flux<HealthDescription> {
- throw NotImplementedError()
- }
-}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
deleted file mode 100644
index 3770913a..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
-
-import reactor.core.publisher.FluxProcessor
-import reactor.core.publisher.UnicastProcessor
-import reactor.retry.RetryExhaustedException
-
-
-const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
-const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-
-val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
-
-val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
-
-
-val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
-
-
-val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
- routing = routing {
- }.build()
-)
-
-class FakeConfigurationProvider : ConfigurationProvider {
- private var shouldThrowException = false
- private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
-
- fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
- if (shouldThrowException) {
- configStream.onError(RetryExhaustedException("I'm so tired"))
- } else {
- configStream.onNext(collectorConfiguration)
- }
-
-
- fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
- this.shouldThrowException = shouldThrowException
- }
-
- override fun invoke() = configStream
-}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
deleted file mode 100644
index aa3fdc39..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class FakeMetrics: Metrics {
- override fun notifyBytesReceived(size: Int) {
- }
-
- override fun notifyMessageReceived(size: Int) {
- }
-
- override fun notifyMessageSent(topic: String) {
- }
-} \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
deleted file mode 100644
index a5fd546a..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import reactor.core.publisher.Flux
-import java.util.*
-import java.util.concurrent.ConcurrentLinkedDeque
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class StoringSink : Sink {
- private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
-
- val sentMessages: List<RoutedMessage>
- get() = sent.toList()
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext(sent::addLast)
- }
-}
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class CountingSink : Sink {
- private val atomicCount = AtomicLong(0)
-
- val count: Long
- get() = atomicCount.get()
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext {
- atomicCount.incrementAndGet()
- }
- }
-}
diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml
deleted file mode 100644
index 93f22771..00000000
--- a/hv-collector-ct/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
- <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>
- %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
- </pattern>
- </encoder>
- </appender>
-
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
- </encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
- <maxFileSize>50MB</maxFileSize>
- <maxHistory>30</maxHistory>
- <totalSizeCap>10GB</totalSizeCap>
- </rollingPolicy>
- </appender>
-
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
- </root>
-</configuration>