diff options
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt')
-rw-r--r-- | sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt new file mode 100644 index 00000000..2d81c671 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -0,0 +1,347 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting +import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame +import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload + +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object VesHvSpecification : Spek({ + debugRx(false) + + describe("VES High Volume Collector") { + it("should handle multiple HV RAN events") { + val (sut, sink) = vesHvWithStoringSink() + val messages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP), + vesWireFrameMessage(PERF3GPP) + ) + + assertThat(messages) + .describedAs("should send all events") + .hasSize(2) + } + } + + describe("Memory management") { + it("should release memory for each handled and dropped message") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithInvalidFrame = invalidWireFrame() + val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection( + sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidFrame.refCnt()) + .describedAs("message with invalid frame should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithTooBigPayload.refCnt()) + .describedAs("message with payload exceeding 1MiB should be released") + .isEqualTo(expectedRefCnt) + } + + it("should release memory for each message with invalid payload") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(expectedRefCnt) + + } + + it("should release memory for each message with garbage frame") { + val (sut, sink) = vesHvWithStoringSink() + val validMessage = vesWireFrameMessage(PERF3GPP) + val msgWithGarbageFrame = garbageFrame() + val expectedRefCnt = 0 + + val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame) + + assertThat(handledEvents).hasSize(1) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithGarbageFrame.refCnt()) + .describedAs("message with garbage frame should be released") + .isEqualTo(expectedRefCnt) + + } + } + + describe("message routing") { + it("should direct message to a topic by means of routing configuration") { + val (sut, sink) = vesHvWithStoringSink() + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + } + + it("should be able to direct 2 messages from different domains to one topic") { + val (sut, sink) = vesHvWithStoringSink() + + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + + val messages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP), + vesWireFrameMessage(HEARTBEAT), + vesWireFrameMessage(MEASUREMENT)) + + assertThat(messages).describedAs("number of routed messages").hasSize(3) + + assertThat(messages[0].topic).describedAs("first message topic") + .isEqualTo(PERF3GPP_TOPIC) + + assertThat(messages[1].topic).describedAs("second message topic") + .isEqualTo(PERF3GPP_TOPIC) + + assertThat(messages[2].topic).describedAs("last message topic") + .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + } + + it("should drop message if route was not found") { + val (sut, sink) = vesHvWithStoringSink() + val messages = sut.handleConnection(sink, + vesWireFrameMessage(OTHER, "first"), + vesWireFrameMessage(PERF3GPP, "second"), + vesWireFrameMessage(HEARTBEAT, "third")) + + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") + } + } + + describe("configuration update") { + + val defaultTimeout = Duration.ofSeconds(10) + + given("successful configuration change") { + + lateinit var sut: Sut + lateinit var sink: StoringSink + + beforeEachTest { + vesHvWithStoringSink().run { + sut = first + sink = second + } + } + + it("should update collector") { + val firstCollector = sut.collector + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + val collectorAfterUpdate = sut.collector + + assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) + } + + it("should start routing messages") { + + sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).isEmpty() + + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messagesAfterUpdate).hasSize(1) + val message = messagesAfterUpdate[0] + + assertThat(message.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(PERF3GPP_TOPIC) + assertThat(message.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should change domain routing") { + + val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messages).hasSize(1) + val firstMessage = messages[0] + + assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + .isEqualTo(PERF3GPP_TOPIC) + assertThat(firstMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + + + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + assertThat(messagesAfterUpdate).hasSize(2) + val secondMessage = messagesAfterUpdate[1] + + assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) + assertThat(secondMessage.partition).describedAs("routed message partition") + .isEqualTo(0) + } + + it("should update routing for each client sending one message") { + + val messagesAmount = 10 + val messagesForEachTopic = 5 + + Flux.range(0, messagesAmount).doOnNext { + if (it == messagesForEachTopic) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + } + }.doOnNext { + sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) + }.then().block(defaultTimeout) + + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + + assertThat(messages.size).isEqualTo(messagesAmount) + assertThat(messagesForEachTopic) + .describedAs("amount of messages routed to each topic") + .isEqualTo(firstTopicMessagesCount) + .isEqualTo(secondTopicMessagesCount) + } + + it("should not update routing for client sending continuous stream of messages") { + + val messageStreamSize = 10 + val pivot = 5 + + val incomingMessages = Flux.range(0, messageStreamSize) + .doOnNext { + if (it == pivot) { + sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) + println("config changed") + } + } + .map { vesWireFrameMessage(PERF3GPP) } + + + sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) + + val messages = sink.sentMessages + val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + + assertThat(messages.size).isEqualTo(messageStreamSize) + assertThat(firstTopicMessagesCount) + .describedAs("amount of messages routed to first topic") + .isEqualTo(messageStreamSize) + + assertThat(secondTopicMessagesCount) + .describedAs("amount of messages routed to second topic") + .isEqualTo(0) + } + + it("should mark the application healthy") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthDescription.HEALTHY) + } + } + + given("failed configuration change") { + val (sut, _) = vesHvWithStoringSink() + sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) + sut.configurationProvider.updateConfiguration(basicConfiguration) + + it("should mark the application unhealthy ") { + assertThat(sut.healthStateProvider.currentHealth) + .describedAs("application health state") + .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) + } + } + } + + describe("request validation") { + it("should reject message with payload greater than 1 MiB and all subsequent messages") { + val (sut, sink) = vesHvWithStoringSink() + + val handledMessages = sut.handleConnection(sink, + vesWireFrameMessage(PERF3GPP, "first"), + vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), + vesWireFrameMessage(PERF3GPP)) + + assertThat(handledMessages).hasSize(1) + assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") + } + } + +}) + +private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { + val sink = StoringSink() + val sut = Sut(sink) + sut.configurationProvider.updateConfiguration(basicConfiguration) + return Pair(sut, sink) +} |