diff options
Diffstat (limited to 'hv-collector-ct/src')
3 files changed, 40 insertions, 155 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 928c62fb..1e22d4c0 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.syntax.function.partially1 -import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.CompositeByteBuf @@ -33,17 +32,16 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.ves.VesEventV5 -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.core.publisher.Flux import reactor.math.sum import java.security.MessageDigest import java.time.Duration -import java.util.* +import java.util.Random import kotlin.system.measureTimeMillis /** @@ -64,7 +62,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) val params = MessageParameters( - commonEventHeader = createSampleCommonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVRANMEAS), messageType = VALID, amount = numMessages ) @@ -94,7 +92,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofSeconds(30) val params = MessageParameters( - commonEventHeader = createSampleCommonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVRANMEAS), messageType = VALID, amount = numMessages ) @@ -203,20 +201,3 @@ private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> { } } -private fun createSampleCommonHeader(domain: Domain): VesEventV5.VesEvent.CommonEventHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder() - .setVersion("sample-version") - .setDomain(domain) - .setSequence(1) - .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL) - .setEventId("sample-event-id") - .setEventName("sample-event-name") - .setEventType("sample-event-type") - .setStartEpochMicrosec(120034455) - .setLastEpochMicrosec(120034455) - .setNfNamingCode("sample-nf-naming-code") - .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) - .setSourceId(ByteString.copyFromUtf8("sample-source-id")) - .setSourceName("sample-source-name") - .build() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 5e6e666f..1f07c233 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,7 +24,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload +import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload import reactor.core.publisher.Flux import java.time.Duration @@ -38,7 +44,10 @@ object VesHvSpecification : Spek({ describe("VES High Volume Collector") { it("should handle multiple HV RAN events") { val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, + vesWireFrameMessage(Domain.HVRANMEAS), + vesWireFrameMessage(Domain.HVRANMEAS) + ) assertThat(messages) .describedAs("should send all events") @@ -47,9 +56,9 @@ object VesHvSpecification : Spek({ it("should not handle messages received from client after end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesMessage(Domain.HVRANMEAS) - val anotherValidMessage = vesMessage(Domain.HVRANMEAS) - val endOfTransmissionMessage = endOfTransmissionMessage() + val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionWireMessage() val handledEvents = sut.handleConnection(sink, validMessage, @@ -73,8 +82,8 @@ object VesHvSpecification : Spek({ describe("Memory management") { it("should release memory for each handled and dropped message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesMessage(Domain.HVRANMEAS) - val msgWithInvalidDomain = vesMessage(Domain.OTHER) + val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER) val msgWithInvalidFrame = invalidWireFrame() val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS) val expectedRefCnt = 0 @@ -100,8 +109,8 @@ object VesHvSpecification : Spek({ it("should release memory for end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesMessage(Domain.HVRANMEAS) - val endOfTransmissionMessage = endOfTransmissionMessage() + val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val endOfTransmissionMessage = endOfTransmissionWireMessage() val expectedRefCnt = 0 val handledEvents = sut.handleConnection(sink, @@ -120,8 +129,8 @@ object VesHvSpecification : Spek({ it("should release memory for each message with invalid payload") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesMessage(Domain.HVRANMEAS) - val msgWithInvalidPayload = invalidVesMessage() + val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() val expectedRefCnt = 0 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload) @@ -139,7 +148,7 @@ object VesHvSpecification : Spek({ it("should release memory for each message with garbage frame") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 @@ -161,7 +170,7 @@ object VesHvSpecification : Spek({ it("should direct message to a topic by means of routing configuration") { val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] @@ -175,9 +184,9 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) val messages = sut.handleConnection(sink, - vesMessage(Domain.HVRANMEAS), - vesMessage(Domain.HEARTBEAT), - vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) + vesWireFrameMessage(Domain.HVRANMEAS), + vesWireFrameMessage(Domain.HEARTBEAT), + vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) assertThat(messages).describedAs("number of routed messages").hasSize(3) @@ -194,9 +203,9 @@ object VesHvSpecification : Spek({ it("should drop message if route was not found") { val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, - vesMessage(Domain.OTHER, "first"), - vesMessage(Domain.HVRANMEAS, "second"), - vesMessage(Domain.HEARTBEAT, "third")) + vesWireFrameMessage(Domain.OTHER, "first"), + vesWireFrameMessage(Domain.HVRANMEAS, "second"), + vesWireFrameMessage(Domain.HEARTBEAT, "third")) assertThat(messages).describedAs("number of routed messages").hasSize(1) @@ -228,12 +237,12 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(messages).isEmpty() sut.configurationProvider.updateConfiguration(basicConfiguration) - val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] @@ -248,7 +257,7 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(basicConfiguration) - val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(messages).hasSize(1) val firstMessage = messages[0] @@ -260,7 +269,7 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] @@ -283,7 +292,7 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) } }.doOnNext { - sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS)) + sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) }.then().block(defaultTimeout) @@ -314,7 +323,7 @@ object VesHvSpecification : Spek({ println("config changed") } } - .map { vesMessage(Domain.HVRANMEAS) } + .map { vesWireFrameMessage(Domain.HVRANMEAS) } sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) @@ -339,9 +348,9 @@ object VesHvSpecification : Spek({ val (sut, sink) = vesHvWithStoringSink() val handledMessages = sut.handleConnection(sink, - vesMessage(Domain.HVRANMEAS, "first"), - vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"), - vesMessage(Domain.HVRANMEAS, "third")) + vesWireFrameMessage(Domain.HVRANMEAS, "first"), + vesMessageWithTooBigPayload(Domain.HVRANMEAS), + vesWireFrameMessage(Domain.HVRANMEAS)) assertThat(handledMessages).hasSize(1) assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt deleted file mode 100644 index a63aa9d5..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.ves.VesEventV5 -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import java.util.* - -val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT - -fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = - allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB - - val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes - } - -fun endOfTransmissionMessage(): ByteBuf = - allocator.buffer().writeByte(0xAA) - - -fun invalidVesMessage(): ByteBuf = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB - - val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) - writeInt(invalidGpb.size) // ves event size in bytes - writeBytes(invalidGpb) - -} - -fun garbageFrame(): ByteBuf = allocator.buffer().run { - writeBytes("the meaning of life is &@)(*_!".toByteArray()) -} - -fun invalidWireFrame(): ByteBuf = allocator.buffer().run { - writeByte(0xFF) - writeByte(0x01) // version - writeByte(0x01) // content type = GPB -} - -fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = - allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB - - val gpb = vesEvent( - domain, - id, - ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) - ).toByteString().asReadOnlyByteBuffer() - - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes - } - -fun vesEvent(domain: Domain = Domain.HVRANMEAS, - id: String = UUID.randomUUID().toString(), - hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent = - VesEvent.newBuilder() - .setCommonEventHeader( - CommonEventHeader.getDefaultInstance().toBuilder() - .setVersion("1.0") - .setEventName("xyz") - .setEventId(id) - .setDomain(domain) - .setEventName("Sample event name") - .setSourceName("Sample Source") - .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) - .setPriority(CommonEventHeader.Priority.MEDIUM) - .setStartEpochMicrosec(120034455) - .setLastEpochMicrosec(120034459) - .setSequence(1)) - .setHvRanMeasFields(hvRanMeasFields) - .build() |