diff options
Diffstat (limited to 'hv-collector-ct/src')
9 files changed, 0 insertions, 930 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt deleted file mode 100644 index 0897e910..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ /dev/null @@ -1,203 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import arrow.syntax.function.partially1 -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import io.netty.buffer.CompositeByteBuf -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory -import reactor.core.publisher.Flux -import reactor.math.sum -import java.security.MessageDigest -import java.time.Duration -import java.util.* -import kotlin.system.measureTimeMillis - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object PerformanceSpecification : Spek({ - debugRx(false) - - describe("VES High Volume Collector performance") { - it("should handle multiple clients in reasonable time") { - val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) - - val numMessages: Long = 300_000 - val runs = 4 - val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) - - val params = MessageParameters( - commonEventHeader = commonHeader(PERF3GPP), - messageType = VALID, - amount = numMessages - ) - - val fluxes = (1.rangeTo(runs)).map { - sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params)) - } - val durationMs = measureTimeMillis { - Flux.merge(fluxes).then().block(timeout) - } - - val durationSec = durationMs / 1000.0 - val throughput = sink.count / durationSec - logger.info("Processed $runs connections each containing $numMessages msgs.") - logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") - assertThat(sink.count) - .describedAs("should send all events") - .isEqualTo(runs * numMessages) - } - - it("should disconnect on transmission errors") { - val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) - - val numMessages: Long = 100_000 - val timeout = Duration.ofSeconds(30) - - val params = MessageParameters( - commonEventHeader = commonHeader(PERF3GPP), - messageType = VALID, - amount = numMessages - ) - - val dataStream = generateDataStream(sut.alloc, params) - .transform(::dropWhenIndex.partially1 { it % 101 == 0L }) - sut.collector.handleConnection(sut.alloc, dataStream) - .timeout(timeout) - .block() - - logger.info("Forwarded ${sink.count} msgs") - assertThat(sink.count) - .describedAs("should send up to number of events") - .isLessThan(numMessages) - } - } - - describe("test infrastructure") { - val digest = MessageDigest.getInstance("MD5") - - fun collectDigest(bb: ByteBuf) { - bb.markReaderIndex() - while (bb.isReadable) { - digest.update(bb.readByte()) - } - bb.resetReaderIndex() - } - - fun calculateDigest(arrays: List<ByteArray>): ByteArray { - for (array in arrays) { - digest.update(array) - } - return digest.digest() - } - - it("should yield same bytes as in the input") { - val numberOfBuffers = 10 - val singleBufferSize = 1000 - val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) } - val inputDigest = calculateDigest(arrays) - - val actualTotalSize = Flux.fromIterable(arrays) - .map { Unpooled.wrappedBuffer(it) } - .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) } - .doOnNext(::collectDigest) - .map { - val size = it.readableBytes() - it.release() - size - } - .sum() - .map(Long::toInt) - .block() - - val outputDigest = digest.digest() - - assertThat(actualTotalSize!!).isEqualTo(numberOfBuffers * singleBufferSize) - assertThat(outputDigest).isEqualTo(inputDigest) - - } - } -}) - - -private const val ONE_MILION = 1_000_000.0 - -private val rand = Random() -private fun randomByteArray(size: Int): ByteArray { - val bytes = ByteArray(size) - rand.nextBytes(bytes) - return bytes -} - -fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> = - stream.index() - .filter { predicate(it.t1) } - .map { it.t2 } - -private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = - WireFrameEncoder(alloc).let { encoder -> - MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES) - .createMessageFlux(listOf(params)) - .map(encoder::encode) - .transform { simulateRemoteTcp(alloc, 1000, it) } - } - -private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) = - byteBuffers - .bufferTimeout(maxSize, Duration.ofMillis(250)) - .map { joinBuffers(alloc, it) } - .concatMap { randomlySplitTcpFrames(it) } - -private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) = - alloc.compositeBuffer().addComponents(true, it) - -private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { - val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt()) - return Flux.create<ByteBuf> { sink -> - while (bb.isReadable) { - val frameSize = Math.min(targetFrameSize, bb.readableBytes()) - sink.next(bb.retainedSlice(bb.readerIndex(), frameSize)) - bb.readerIndex(bb.readerIndex() + frameSize) - } - bb.release() - sink.complete() - } -} - diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt deleted file mode 100644 index 0495ced5..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import arrow.core.getOrElse -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import io.netty.buffer.UnpooledByteBufAllocator -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.factory.CollectorFactory -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState -import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics -import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import reactor.core.publisher.Flux -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class Sut(sink: Sink = StoringSink()) { - val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthState() - - val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT - private val metrics = FakeMetrics() - private val collectorFactory = CollectorFactory( - configurationProvider, - SinkProvider.just(sink), - metrics, - MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider) - private val collectorProvider = collectorFactory.createVesHvCollectorProvider() - - val collector: Collector - get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } - - companion object { - const val MAX_PAYLOAD_SIZE_BYTES = 1024 - } - -} - -fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - return sink.sentMessages -} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt deleted file mode 100644 index 2d81c671..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ /dev/null @@ -1,347 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame -import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize -import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage -import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload - -import reactor.core.publisher.Flux -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object VesHvSpecification : Spek({ - debugRx(false) - - describe("VES High Volume Collector") { - it("should handle multiple HV RAN events") { - val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, - vesWireFrameMessage(PERF3GPP), - vesWireFrameMessage(PERF3GPP) - ) - - assertThat(messages) - .describedAs("should send all events") - .hasSize(2) - } - } - - describe("Memory management") { - it("should release memory for each handled and dropped message") { - val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(PERF3GPP) - val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) - val expectedRefCnt = 0 - - val handledEvents = sut.handleConnection( - sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload) - - assertThat(handledEvents).hasSize(1) - - assertThat(validMessage.refCnt()) - .describedAs("handled message should be released") - .isEqualTo(expectedRefCnt) - assertThat(msgWithInvalidFrame.refCnt()) - .describedAs("message with invalid frame should be released") - .isEqualTo(expectedRefCnt) - assertThat(msgWithTooBigPayload.refCnt()) - .describedAs("message with payload exceeding 1MiB should be released") - .isEqualTo(expectedRefCnt) - } - - it("should release memory for each message with invalid payload") { - val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(PERF3GPP) - val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() - val expectedRefCnt = 0 - - val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) - - assertThat(handledEvents).hasSize(1) - - assertThat(validMessage.refCnt()) - .describedAs("handled message should be released") - .isEqualTo(expectedRefCnt) - assertThat(msgWithInvalidPayload.refCnt()) - .describedAs("message with invalid payload should be released") - .isEqualTo(expectedRefCnt) - - } - - it("should release memory for each message with garbage frame") { - val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(PERF3GPP) - val msgWithGarbageFrame = garbageFrame() - val expectedRefCnt = 0 - - val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame) - - assertThat(handledEvents).hasSize(1) - - assertThat(validMessage.refCnt()) - .describedAs("handled message should be released") - .isEqualTo(expectedRefCnt) - assertThat(msgWithGarbageFrame.refCnt()) - .describedAs("message with garbage frame should be released") - .isEqualTo(expectedRefCnt) - - } - } - - describe("message routing") { - it("should direct message to a topic by means of routing configuration") { - val (sut, sink) = vesHvWithStoringSink() - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).describedAs("number of routed messages").hasSize(1) - - val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) - } - - it("should be able to direct 2 messages from different domains to one topic") { - val (sut, sink) = vesHvWithStoringSink() - - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) - - val messages = sut.handleConnection(sink, - vesWireFrameMessage(PERF3GPP), - vesWireFrameMessage(HEARTBEAT), - vesWireFrameMessage(MEASUREMENT)) - - assertThat(messages).describedAs("number of routed messages").hasSize(3) - - assertThat(messages[0].topic).describedAs("first message topic") - .isEqualTo(PERF3GPP_TOPIC) - - assertThat(messages[1].topic).describedAs("second message topic") - .isEqualTo(PERF3GPP_TOPIC) - - assertThat(messages[2].topic).describedAs("last message topic") - .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - } - - it("should drop message if route was not found") { - val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, - vesWireFrameMessage(OTHER, "first"), - vesWireFrameMessage(PERF3GPP, "second"), - vesWireFrameMessage(HEARTBEAT, "third")) - - assertThat(messages).describedAs("number of routed messages").hasSize(1) - - val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") - } - } - - describe("configuration update") { - - val defaultTimeout = Duration.ofSeconds(10) - - given("successful configuration change") { - - lateinit var sut: Sut - lateinit var sink: StoringSink - - beforeEachTest { - vesHvWithStoringSink().run { - sut = first - sink = second - } - } - - it("should update collector") { - val firstCollector = sut.collector - - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val collectorAfterUpdate = sut.collector - - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) - } - - it("should start routing messages") { - - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).isEmpty() - - sut.configurationProvider.updateConfiguration(basicConfiguration) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] - - assertThat(message.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) - } - - it("should change domain routing") { - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] - - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) - - - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] - - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) - } - - it("should update routing for each client sending one message") { - - val messagesAmount = 10 - val messagesForEachTopic = 5 - - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - }.then().block(defaultTimeout) - - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } - - it("should not update routing for client sending continuous stream of messages") { - - val messageStreamSize = 10 - val pivot = 5 - - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(PERF3GPP) } - - - sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) - - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) - } - - it("should mark the application healthy") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.HEALTHY) - } - } - - given("failed configuration change") { - val (sut, _) = vesHvWithStoringSink() - sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicConfiguration) - - it("should mark the application unhealthy ") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) - } - } - } - - describe("request validation") { - it("should reject message with payload greater than 1 MiB and all subsequent messages") { - val (sut, sink) = vesHvWithStoringSink() - - val handledMessages = sut.handleConnection(sink, - vesWireFrameMessage(PERF3GPP, "first"), - vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), - vesWireFrameMessage(PERF3GPP)) - - assertThat(handledMessages).hasSize(1) - assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") - } - } - -}) - -private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { - val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) - return Pair(sut, sink) -} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt deleted file mode 100644 index 29df8c70..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import org.jetbrains.spek.api.dsl.SpecBody -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Hooks - -fun SpecBody.debugRx(debug: Boolean = true) { - if (debug) { - beforeGroup { - Hooks.onOperatorDebug() - } - - afterGroup { - Hooks.resetOnOperatorDebug() - } - } -} - -val logger = Logger("org.onap.dcae.collectors.veshv.tests.component") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt deleted file mode 100644 index c25771b7..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import reactor.core.publisher.Flux - -class FakeHealthState : HealthState { - - lateinit var currentHealth: HealthDescription - - override fun changeState(healthDescription: HealthDescription) { - currentHealth = healthDescription - } - - override fun invoke(): Flux<HealthDescription> { - throw NotImplementedError() - } -} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt deleted file mode 100644 index 3770913a..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.routing - -import reactor.core.publisher.FluxProcessor -import reactor.core.publisher.UnicastProcessor -import reactor.retry.RetryExhaustedException - - -const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" -const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" -const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" - -val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) - -val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } - }.build() -) - - -val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) - - -val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - kafkaBootstrapServers = "localhost:9969", - routing = routing { - }.build() -) - -class FakeConfigurationProvider : ConfigurationProvider { - private var shouldThrowException = false - private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() - - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = - if (shouldThrowException) { - configStream.onError(RetryExhaustedException("I'm so tired")) - } else { - configStream.onNext(collectorConfiguration) - } - - - fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { - this.shouldThrowException = shouldThrowException - } - - override fun invoke() = configStream -} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt deleted file mode 100644 index aa3fdc39..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.Metrics - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class FakeMetrics: Metrics { - override fun notifyBytesReceived(size: Int) { - } - - override fun notifyMessageReceived(size: Int) { - } - - override fun notifyMessageSent(topic: String) { - } -}
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt deleted file mode 100644 index a5fd546a..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import reactor.core.publisher.Flux -import java.util.* -import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class StoringSink : Sink { - private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() - - val sentMessages: List<RoutedMessage> - get() = sent.toList() - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - return messages.doOnNext(sent::addLast) - } -} - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class CountingSink : Sink { - private val atomicCount = AtomicLong(0) - - val count: Long - get() = atomicCount.get() - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - return messages.doOnNext { - atomicCount.incrementAndGet() - } - } -} diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml deleted file mode 100644 index 93f22771..00000000 --- a/hv-collector-ct/src/test/resources/logback-test.xml +++ /dev/null @@ -1,35 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> - - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> - - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> -</configuration> |