diff options
Diffstat (limited to 'hv-collector-ct/src')
3 files changed, 62 insertions, 59 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index ba29844a..8e6103c9 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.core.publisher.Flux import reactor.math.sum import java.security.MessageDigest @@ -62,7 +62,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) val params = MessageParameters( - commonEventHeader = commonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVMEAS), messageType = VALID, amount = numMessages ) @@ -92,7 +92,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofSeconds(30) val params = MessageParameters( - commonEventHeader = commonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVMEAS), messageType = VALID, amount = numMessages ) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index a9f3e9a4..60e10ee0 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,9 +24,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -39,7 +43,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.Flux import java.time.Duration @@ -54,8 +58,8 @@ object VesHvSpecification : Spek({ it("should handle multiple HV RAN events") { val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HVRANMEAS) + vesWireFrameMessage(HVMEAS), + vesWireFrameMessage(HVMEAS) ) assertThat(messages) @@ -65,8 +69,8 @@ object VesHvSpecification : Spek({ it("should not handle messages received from client after end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) - val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) + val anotherValidMessage = vesWireFrameMessage(HVMEAS) val endOfTransmissionMessage = endOfTransmissionWireMessage() val handledEvents = sut.handleConnection(sink, @@ -91,23 +95,19 @@ object VesHvSpecification : Spek({ describe("Memory management") { it("should release memory for each handled and dropped message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) - val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS) + val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( - sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload) + sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload) assertThat(handledEvents).hasSize(1) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") .isEqualTo(expectedRefCnt) - assertThat(msgWithInvalidDomain.refCnt()) - .describedAs("message with invalid domain should be released") - .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") .isEqualTo(expectedRefCnt) @@ -118,7 +118,7 @@ object VesHvSpecification : Spek({ it("should release memory for end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val endOfTransmissionMessage = endOfTransmissionWireMessage() val expectedRefCnt = 0 @@ -138,7 +138,7 @@ object VesHvSpecification : Spek({ it("should release memory for each message with invalid payload") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() val expectedRefCnt = 0 @@ -157,7 +157,7 @@ object VesHvSpecification : Spek({ it("should release memory for each message with garbage frame") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 @@ -179,11 +179,11 @@ object VesHvSpecification : Spek({ it("should direct message to a topic by means of routing configuration") { val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC) assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } @@ -193,17 +193,17 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HEARTBEAT), - vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) + vesWireFrameMessage(HVMEAS), + vesWireFrameMessage(HEARTBEAT), + vesWireFrameMessage(MEASUREMENT)) assertThat(messages).describedAs("number of routed messages").hasSize(3) assertThat(messages[0].topic).describedAs("first message topic") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(messages[1].topic).describedAs("second message topic") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(messages[2].topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) @@ -212,14 +212,14 @@ object VesHvSpecification : Spek({ it("should drop message if route was not found") { val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.OTHER, "first"), - vesWireFrameMessage(Domain.HVRANMEAS, "second"), - vesWireFrameMessage(Domain.HEARTBEAT, "third")) + vesWireFrameMessage(OTHER, "first"), + vesWireFrameMessage(HVMEAS, "second"), + vesWireFrameMessage(HEARTBEAT, "third")) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -253,41 +253,41 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).isEmpty() sut.configurationProvider.updateConfiguration(basicConfiguration) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] assertThat(message.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(message.partition).describedAs("routed message partition") .isEqualTo(0) } it("should change domain routing") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).hasSize(1) val firstMessage = messages[0] assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") .isEqualTo(0) sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + .isEqualTo(ALTERNATE_HVMEAS_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") .isEqualTo(0) } @@ -302,13 +302,13 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) } }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) }.then().block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -329,14 +329,14 @@ object VesHvSpecification : Spek({ println("config changed") } } - .map { vesWireFrameMessage(Domain.HVRANMEAS) } + .map { vesWireFrameMessage(HVMEAS) } sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -373,9 +373,9 @@ object VesHvSpecification : Spek({ val (sut, sink) = vesHvWithStoringSink() val handledMessages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS, "first"), - vesMessageWithTooBigPayload(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HVRANMEAS)) + vesWireFrameMessage(HVMEAS, "first"), + vesMessageWithTooBigPayload(HVMEAS), + vesWireFrameMessage(HVMEAS)) assertThat(handledMessages).hasSize(1) assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index ebeaa69e..688f2758 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,24 +20,27 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.routing -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val HVRANMEAS_TOPIC = "ves_hvRanMeas" +const val HVMEAS_TOPIC = "ves_hvRanMeas" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" -const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas" +const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } }.build() @@ -47,17 +50,17 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } defineRoute { - fromDomain(Domain.HEARTBEAT) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HEARTBEAT.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } defineRoute { - fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING) + fromDomain(MEASUREMENT.name) toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) withFixedPartitioning() } @@ -69,8 +72,8 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(ALTERNATE_HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(ALTERNATE_HVMEAS_TOPIC) withFixedPartitioning() } }.build() |