aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct/src
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct/src')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt27
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt63
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt105
3 files changed, 40 insertions, 155 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 928c62fb..1e22d4c0 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.tests.component
import arrow.syntax.function.partially1
-import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
@@ -33,17 +32,16 @@ import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventV5
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
import java.time.Duration
-import java.util.*
+import java.util.Random
import kotlin.system.measureTimeMillis
/**
@@ -64,7 +62,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = createSampleCommonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVRANMEAS),
messageType = VALID,
amount = numMessages
)
@@ -94,7 +92,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = createSampleCommonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVRANMEAS),
messageType = VALID,
amount = numMessages
)
@@ -203,20 +201,3 @@ private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
}
}
-private fun createSampleCommonHeader(domain: Domain): VesEventV5.VesEvent.CommonEventHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
- .setVersion("sample-version")
- .setDomain(domain)
- .setSequence(1)
- .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL)
- .setEventId("sample-event-id")
- .setEventName("sample-event-name")
- .setEventType("sample-event-type")
- .setStartEpochMicrosec(120034455)
- .setLastEpochMicrosec(120034455)
- .setNfNamingCode("sample-nf-naming-code")
- .setNfcNamingCode("sample-nfc-naming-code")
- .setReportingEntityId("sample-reporting-entity-id")
- .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
- .setSourceId(ByteString.copyFromUtf8("sample-source-id"))
- .setSourceName("sample-source-name")
- .build()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 5e6e666f..1f07c233 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,7 +24,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
import reactor.core.publisher.Flux
import java.time.Duration
@@ -38,7 +44,10 @@ object VesHvSpecification : Spek({
describe("VES High Volume Collector") {
it("should handle multiple HV RAN events") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(Domain.HVRANMEAS),
+ vesWireFrameMessage(Domain.HVRANMEAS)
+ )
assertThat(messages)
.describedAs("should send all events")
@@ -47,9 +56,9 @@ object VesHvSpecification : Spek({
it("should not handle messages received from client after end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
- val endOfTransmissionMessage = endOfTransmissionMessage()
+ val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionWireMessage()
val handledEvents = sut.handleConnection(sink,
validMessage,
@@ -73,8 +82,8 @@ object VesHvSpecification : Spek({
describe("Memory management") {
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val msgWithInvalidDomain = vesMessage(Domain.OTHER)
+ val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
val expectedRefCnt = 0
@@ -100,8 +109,8 @@ object VesHvSpecification : Spek({
it("should release memory for end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val endOfTransmissionMessage = endOfTransmissionMessage()
+ val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val endOfTransmissionMessage = endOfTransmissionWireMessage()
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(sink,
@@ -120,8 +129,8 @@ object VesHvSpecification : Spek({
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val msgWithInvalidPayload = invalidVesMessage()
+ val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
@@ -139,7 +148,7 @@ object VesHvSpecification : Spek({
it("should release memory for each message with garbage frame") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -161,7 +170,7 @@ object VesHvSpecification : Spek({
it("should direct message to a topic by means of routing configuration") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
@@ -175,9 +184,9 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
val messages = sut.handleConnection(sink,
- vesMessage(Domain.HVRANMEAS),
- vesMessage(Domain.HEARTBEAT),
- vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+ vesWireFrameMessage(Domain.HVRANMEAS),
+ vesWireFrameMessage(Domain.HEARTBEAT),
+ vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
assertThat(messages).describedAs("number of routed messages").hasSize(3)
@@ -194,9 +203,9 @@ object VesHvSpecification : Spek({
it("should drop message if route was not found") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesMessage(Domain.OTHER, "first"),
- vesMessage(Domain.HVRANMEAS, "second"),
- vesMessage(Domain.HEARTBEAT, "third"))
+ vesWireFrameMessage(Domain.OTHER, "first"),
+ vesWireFrameMessage(Domain.HVRANMEAS, "second"),
+ vesWireFrameMessage(Domain.HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
@@ -228,12 +237,12 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(messages).isEmpty()
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
@@ -248,7 +257,7 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
@@ -260,7 +269,7 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
@@ -283,7 +292,7 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
}
}.doOnNext {
- sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+ sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
}.then().block(defaultTimeout)
@@ -314,7 +323,7 @@ object VesHvSpecification : Spek({
println("config changed")
}
}
- .map { vesMessage(Domain.HVRANMEAS) }
+ .map { vesWireFrameMessage(Domain.HVRANMEAS) }
sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
@@ -339,9 +348,9 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
- vesMessage(Domain.HVRANMEAS, "first"),
- vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
- vesMessage(Domain.HVRANMEAS, "third"))
+ vesWireFrameMessage(Domain.HVRANMEAS, "first"),
+ vesMessageWithTooBigPayload(Domain.HVRANMEAS),
+ vesWireFrameMessage(Domain.HVRANMEAS))
assertThat(handledMessages).hasSize(1)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
deleted file mode 100644
index a63aa9d5..00000000
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.component
-
-import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.ves.VesEventV5
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import java.util.*
-
-val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
-
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
- allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
-
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
- }
-
-fun endOfTransmissionMessage(): ByteBuf =
- allocator.buffer().writeByte(0xAA)
-
-
-fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
-
- val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
- writeInt(invalidGpb.size) // ves event size in bytes
- writeBytes(invalidGpb)
-
-}
-
-fun garbageFrame(): ByteBuf = allocator.buffer().run {
- writeBytes("the meaning of life is &@)(*_!".toByteArray())
-}
-
-fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
- writeByte(0xFF)
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
-}
-
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf =
- allocator.buffer().run {
- writeByte(0xFF) // always 0xFF
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
-
- val gpb = vesEvent(
- domain,
- id,
- ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
- ).toByteString().asReadOnlyByteBuffer()
-
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
- }
-
-fun vesEvent(domain: Domain = Domain.HVRANMEAS,
- id: String = UUID.randomUUID().toString(),
- hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent =
- VesEvent.newBuilder()
- .setCommonEventHeader(
- CommonEventHeader.getDefaultInstance().toBuilder()
- .setVersion("1.0")
- .setEventName("xyz")
- .setEventId(id)
- .setDomain(domain)
- .setEventName("Sample event name")
- .setSourceName("Sample Source")
- .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
- .setPriority(CommonEventHeader.Priority.MEDIUM)
- .setStartEpochMicrosec(120034455)
- .setLastEpochMicrosec(120034459)
- .setSequence(1))
- .setHvRanMeasFields(hvRanMeasFields)
- .build()