aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt8
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt54
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt15
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt29
5 files changed, 34 insertions, 78 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index cf99867f..06687b7d 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -73,6 +73,12 @@
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index fbff769f..80f62d1a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -22,7 +22,13 @@ package org.onap.dcae.collectors.veshv.impl.wire
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.*
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index eb8971c3..213f4544 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -19,17 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import com.google.protobuf.ByteString
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
@@ -37,22 +35,14 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
- fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData {
- val msg = VesEvent.newBuilder()
- .setCommonEventHeader(commonHeader)
- .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
- .build()
- return msg.toByteData()
- }
-
given("Message validator") {
val cut = MessageValidator
on("ves hv message including header with fully initialized fields") {
- val commonHeader = createInitializedHeaderBuilder().build()
+ val commonHeader = commonHeader()
it("should accept message with fully initialized message header") {
- val vesMessage = VesMessage(commonHeader, vesMessageBytes(commonHeader))
+ val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
@@ -60,8 +50,8 @@ internal object MessageValidatorTest : Spek({
.filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) }
.forEach { domain ->
it("should accept message with $domain domain") {
- val header = newBuilder(commonHeader).setDomain(domain).build()
- val vesMessage = VesMessage(header, vesMessageBytes(header))
+ val header = commonHeader(domain)
+ val vesMessage = VesMessage(header, vesEventBytes(header))
assertThat(cut.isValid(vesMessage))
.isTrue()
}
@@ -83,10 +73,8 @@ internal object MessageValidatorTest : Spek({
domainTestCases.forEach { value, expectedResult ->
on("ves hv message including header with domain $value") {
- val commonEventHeader = createInitializedHeaderBuilder()
- .setDomain(value)
- .build()
- val vesMessage = VesMessage(commonEventHeader, vesMessageBytes(commonEventHeader))
+ val commonEventHeader = commonHeader(value)
+ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
it("should resolve validation result") {
assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
@@ -102,10 +90,8 @@ internal object MessageValidatorTest : Spek({
priorityTestCases.forEach { value, expectedResult ->
on("ves hv message including header with priority $value") {
- val commonEventHeader = createInitializedHeaderBuilder()
- .setPriority(value)
- .build()
- val vesMessage = VesMessage(commonEventHeader, vesMessageBytes(commonEventHeader))
+ val commonEventHeader = commonHeader(priority = value)
+ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
it("should resolve validation result") {
assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
@@ -114,7 +100,6 @@ internal object MessageValidatorTest : Spek({
}
}
-
on("ves hv message including header with not initialized fields") {
val commonHeader = newBuilder()
.setVersion("1.9")
@@ -122,11 +107,7 @@ internal object MessageValidatorTest : Spek({
.setEventId("Sample event Id")
.setSourceName("Sample Source")
.build()
- val msg = VesEvent.newBuilder()
- .setCommonEventHeader(commonHeader)
- .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
- .build()
- val rawMessageBytes = msg.toByteData()
+ val rawMessageBytes = vesEventBytes(commonHeader)
it("should not accept not fully initialized message header ") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -135,16 +116,3 @@ internal object MessageValidatorTest : Spek({
}
}
})
-
-private fun createInitializedHeaderBuilder(): CommonEventHeader.Builder =
- newBuilder()
- .setVersion("1.9")
- .setEventName("Sample event name")
- .setDomain(Domain.HVRANMEAS)
- .setEventId("Sample event Id")
- .setSourceName("Sample Source")
- .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
- .setPriority(Priority.MEDIUM)
- .setStartEpochMicrosec(120034455)
- .setLastEpochMicrosec(120034459)
- .setSequence(2) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 599a9d40..91fa7c19 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -30,7 +30,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
/**
@@ -56,7 +56,7 @@ object RouterTest : Spek({
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -77,7 +77,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -98,7 +98,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
@@ -106,9 +106,4 @@ object RouterTest : Spek({
}
}
}
-})
-
-private fun vesCommonHeaderWithDomain(domain: Domain) =
- CommonEventHeader.getDefaultInstance().toBuilder()
- .setDomain(domain)
- .build() \ No newline at end of file
+}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 3f1f610e..a7d3971e 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -41,12 +41,8 @@ internal object VesDecoderTest : Spek({
val cut = VesDecoder()
on("ves hv message bytes") {
- val commonHeader = commonEventHeader()
- val msg = VesEvent.newBuilder()
- .setCommonEventHeader(commonHeader)
- .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
- .build()
- val rawMessageBytes = msg.toByteData()
+ val commonHeader = commonHeader(Domain.HEARTBEAT)
+ val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
val expectedMessage = VesMessage(
@@ -76,18 +72,3 @@ private fun <A> assertFailedWithError(option: Option<A>) =
option.exists {
fail("Error expected")
}
-
-
-private fun commonEventHeader() =
- CommonEventHeader.getDefaultInstance().toBuilder()
- .setDomain(CommonEventHeader.Domain.HEARTBEAT)
- .setVersion("1.0")
- .setEventName("xyz")
- .setEventId("eventID")
- .setEventName("Sample event name")
- .setSourceName("Sample Source")
- .setPriority(CommonEventHeader.Priority.MEDIUM)
- .setStartEpochMicrosec(120034455)
- .setLastEpochMicrosec(120034459)
- .setSequence(1)
- .build()